并行编程(Parallel Framework)

前言

并行编程:通过编码方式利用多核或多处理器称为并行编程,多线程概念的一个子集。

并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。多线程的一种。

并行编程分为如下几个结构

1.并行的LINQPLINQ

2.Parallel

3.任务并行结构

4.并发集合

5.SpinLockSpinWait

这些是.NET 4.0引入的功能,一般被称为PFX(Parallel Framework,并行框架)

Parallel类和任务并行结构称为TPL(Task Parallel Library,任务并行库)

并行框架(PFX)

1.并行框架基础

当前CPU技术达到瓶颈,而制造商将关注重点转移到提高内核技术上,而标准单线程代码并不会因此而自动提高运行速度。
利用多核提升程序性能通常需要对计算密集型代码进行一些处理:
1.将代码划分成块。
2.通过多线程并行执行这些代码块。
3.结果变为可用后,以线程安全和高性能的方式整合这些结果。
传统多线程结构虽然实现功能,但难度颇高且不方便,特别是划分和整理的步骤(本质问题是:多线程同时使用相同数据时,出于线程安全考虑进行锁定的常用策略会引发大量竞争)。
并行框架(Parallel Framework)专门用于在这些应用场景中提供帮助。

2.并行框架组成

PFX:高层由两个数据并行API组成:PLINQ或Parallel类。底层包含任务并行类和一组另外的结构为并行编程提供帮助。

基础并行语言集成查询(PLINQ)

语言集成查询(Language Integrated Query,LINQ)提供了一个简捷的语法来查询数据集合。而这种由一个线程顺序处理数据集合的方式我们称为顺序查询(sequential query)

并行语言集成查询(Parallel LINQ)LINQ并行版。它将顺序查询转换为并行查询,在内部使用任务,将集合中数据项的处理工作分散到多个CPU上,以并发处理多个数据项。

PLINQ将自动并行化本地的LINQ查询System.Linq.ParallelEnumerable类(它定义在System.Core.dll中,需要引用System.Linq)公开了所有标准LINQ操作符的并行版本。这些所有方法是依据System.Linq.ParallelQuery<T>扩展而来。

1.LINQ to PLINQ

要让LINQ查询调用并行版本,必须将自己的顺序查询(基于IEnumerable或IEnumerable<T>)转换成并行查询(基于ParallelQuery或ParallelQuery<T>),使用ParallelEnumerableAsParallel方法实现,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             ParallelQuery parallelQuery =
 7                 from n in numbers.AsParallel()//转换为并行
 8                 where n > 3
 9                 select n;
10             foreach (var item in parallelQuery)
11             {
12                 Console.WriteLine(item);
13             }
14             Console.ReadKey();
15         }
16     }

结果如下:使用Enumerable.Range生成的集合是顺序的,但是经过并行查询后顺序被打乱。

2.PLINQ to LINQ

将执行并行查询的操作切换回执行顺序查询(并不常用),通过ParalleIEnumerableAsSequential实现。此时操作只由一个线程执行。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             IEnumerable<int> enumerable = numbers.AsParallel().AsSequential().Where(c => c > 3);
 7             foreach (var item in enumerable)
 8             {
 9                 Console.WriteLine(item);
10             }
11             Console.ReadKey();
12         }
13     }

3.整合结果集(ForAll)

通常,一个LINQ查询的结果数据是让某个线程执行一个foreach来处理,此时只有一个线程遍历查询的所有结果,如果希望以并行方式处理查询结果,通过ParalleIEnumerableForAll方法处理查询,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             (from n in numbers.AsParallel() where n > 3 select n).ForAll((d) =>
 7              {
 8                  d = d + 1000;
 9                  Console.WriteLine(d);//Console在此回损害性能,因为内部回对线程进行同步,此处因演示所以暂且一用
10              });
11             Console.ReadKey();
12         }
13     }

执行结果如下:

解析PLINQ

1.PLINQ执行模型

如图所示:

2.异常处理

PLINQ的报错将以AggregateException形式被重抛,其中InnerExceeptions属性包含一个或多个真正异常,示例可看 异步编程(async&await)内的异常处理部分。

3.PLINQ结果的排序

并行化查询当整理结果时不能保持初始化数据的原始次序。如果要保持序列的原始序列,可以通过在AsParallel之后调用AsOrdered来强制实现:

1             IEnumerable<int> numbers = Enumerable.Range(1, 10000);
2             var enumerable = numbers.AsParallel().Where(c => c > 3);

调用AsOrdered时,因为PLINQ要保持跟踪每个元素的原始位置,会导致性能损失。

调用AsUnordered,可以在后续的查询中低效AsOrdered产生的副作用,允许查询从调用AsUnordered时起更高效的执行。

4.PLINQ存在的局限与限制

1.若要使PLINQ发挥作用,必须具有一定数量的计算密集型工作可分配给工作者线程。大多数的LINQ to Objects查询执行速度很快,不仅没有必要并行化,而且划分、整理和协调额外线程的开销实际上会降低执行速度。而且查询若调用了非线程安全的方法,PLINQ的结果有可能不正确。

2.PLINQ能够并行化的内容还有些限制,以下查询运算符防止查询被并行化,除非源元素位于他们的元素索引位置:Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt的索引版本。

3.以下查询运算符是并行化的,但所使用的复杂划分策略有时可能比顺序处理的速度还要低:Join、GroupBy、GroupJonin、Distinct、Union、Intersect和Except。

5.PLINQ的结果

和普通LINQ查询一样,PLINQ查询也是延迟求值的。意味着执行只在开始使用时触发。但是列举结果集时和普通顺序查询有区别:

顺序查询:完全由使用者从输入序列中“拉取”每个元素。

并行查询:通常使用独立线程来获取序列中的元素,时间上比使用者需要它们时要提前,再通过查询链并行处理元素后将结果保存在一块缓存中,以便使用者按需取用。

注意:过早暂停结果列举,查询处理器也会暂停或结束,目的是不浪费CPU的时间或内存。在调用AsParallel之后调用WithMergeOptions可以调节PLINQ的缓冲行为。

6.如何使用PLINQ

为何优化将LINQ都并行化是不可取的,因为LINQ能解决大多数问题,执行速度也很快,因此无法从并行化中收益。

一种更好的方式是找出CPU密集的瓶颈,然后考虑通过LINQ的形式表达(这类重构,LINQ往往会使代码量变少,而且增强可读性)。

PLINQ十分适用于易并行问题。他还可以很好地处理结构化的阻塞任务。

PLINQ不适于镜像制作,因为将数百万元素整理为一个输出序列将带来瓶颈,相反将元素写入一个数组或托管内存块中,然后使用Parallel类或任务并行管理多线程是更好的选择。

Parallel类

Parallel类是对线程的一个很好的抽象。该类位于System.Threading.Tasks命名空间中,提供了数据和任务并行性

PFX通过Parallel类中的三个静态方法,提供了一种基本形式的结构化并行机制:

1.Parallel.Invoke

Parallel.Invoke用于并行执行一组委托,示例如下:

1         static void Main(string[] args)
2         {
3             Parallel.Invoke(
4                 () => Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}"),
5                 () => Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}")
6                 );
7             Console.ReadKey();
8         }

执行结果

Parallel.Invoke方法并行执行一组Action委托,然后等待它们完成。

1 public static void Invoke(params Action[] actions);

示例看起来像是创建和等待两个Task对象的一种捷径。但两者存在重要的区别:
如果传入一个包含数据量非常大的委托数组时,Parallel.Invoke方法仍然能高效工作,这是因为在底层,Parallel.Invoke方法是将大量元素划分成较小的块,分配给底层的Task执行,而不是每个委托创建一个独立Task。

2.Parallel.For

Parallel.For执行C# for循环的并行化等价循环,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //顺序循环
 6             {
 7                 for (int i = 0; i < 10; i++)
 8                 {
 9                     Test(i);
10                 }
11             }
12             Console.WriteLine("并行化for开始");
13             //顺序执行转换为并行化
14             {
15                 Parallel.For(0, 10, i => Test(i));
16             }
17             //顺序执行转换为并行化(更简单的方式)
18             {
19                 Parallel.For(0, 10, Test);
20             }
21             Console.ReadKey();
22         }
23         static void Test(int i)
24         {
25             Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
26         }
27     }

结果如下:

3.Parallel.ForEach

Parallel.ForEach执行C# foreach循环的并行化等价循环,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             //顺序循环
 7             {
 8                 foreach (string num in data)
 9                 {
10                     Test(num);
11                 }
12             }
13             Console.WriteLine("并行化foreach开始");
14             //顺序执行转换为并行化
15             {
16                 Parallel.ForEach(data, num => Test(num));
17             }
18             Console.ReadKey();
19         }
20         static void Test(string str)
21         {
22             Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{str}");
23         }
24     }

执行结果:

注意:以上三个方法都会引发阻塞直到所有工作完成为止。和PLINQ一样,在出现未处理的异常之后,余下的工作者在它们当前的迭代之后停止,而一场将被抛回给调用者,并封装在一个AggregateException中。

4.索引&跳出(ParallelLoopState)

有时迭代索引很有用处,但是切忌不可像顺序循环的用法使用共享变量(循环内i++)的方式使用,因为共享变量值在并行上下文中是线程不安全的

同样的,因为并行ForForEach中的循环体是一个委托,所以无法使用break语句提前退出循环,必须调用ParallelLoopState对象上的BreakStop方法。

ForEach为例,ForEach重载的其中之一如下,它包含Acton的其中有三个参数(TSourec=子元素,ParallelLoopState=并行循环状态,long=索引):

1 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)

所以,想要得到索引和提前跳出的正确方式如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.ForEach(data, (num, state, i) =>
 7             {
 8                 Console.WriteLine($"当前索引为:{i},状态为:{state}");
 9                 Test(num);
10                 if (num == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{str}");
18         }
19     }

结果如下:

For的版本如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.For(0, data.Length, (i, state) =>
 7             {
 8                 Console.WriteLine($"当前索引为:{i},状态为:{state}");
 9                 Test(data[i]);
10                 if (data[i] == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{str}");
18         }
19     }

任务并行

对于任务并行的内容,请戳 任务(Task)异步编程(async&await)

并发集合概述

.NET 4.0在System.Collections.Concurrent命名空间中提供了一组新的集合。所有这些集合都完全是线程安全的:

这些集合不仅是为使用带锁的普通集合提供了快捷方式,而且可以在一般的多线程中使用并发集合,但需要注意:
1.并发集合针对并行编程进行了调整。只有在高度并发的应用场景中,传统集合的性能才能胜过它们

2.线程安全的集合不能确保使用它的代码也是安全的

3.如果枚举一个并发集合的同时,另一个线程要修改它,不会抛出任何异常,相反,得到旧内容与新内容的混合。

4.不存在任何List<T>的并发版本。

5.它们的内存利用率没有非并发的Stack和Queue类高效,但对于并发访问的效果更好。

1.结构概述

这些并发集合与传统集合的区别是:它们公开了特殊方法来执行原子测试和行动操作,而这些方法都是通过IProducerConsumerCollection<T>接口提供的。

IProducerConsumerCollection<T>接口代表一个线程安全的生产者/消费者集合,这三个类继承并实现了IProducerConsumerCollection<T>接口:

ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

它们实现的TryAddTryTake方法用于测试一个添加/删除操作能否执行,如果可以,则执行添加/删除操作。测试与行动不需要对传统集合上锁。

ConcurrentBag<T>用于保存对象的无需集合,适用于调用Take或TryTake时不关心获取那个元素的额情况。

相对于并发队列或堆栈,在多线程同时调用一个ConcurrentBag的Add时,不存在竞争,但队列或堆栈并行调用Add会引起一些竞争,所以ConcurrentBag上调用Take方法非常高效。

BlockingCollection<T>类似阻塞集合,适用于等待新元素的出现,可以把它看作一个容器,使用一个阻塞集合封装所有实现IProducerConsumerCollection<T>的集合,并且允许从封装的集合中去除元素,若没有元素,操作会阻塞

2.基础方法

常用的一些方法,整理自 zy__ :

ConcurrentQueue:完全无锁,但面临资源竞争失败时可能会陷入自旋并重试操作。

Enqueue:在队尾插入元素

TryDequeue:尝试删除队头元素,并通过out参数返回

TryPeek:尝试将对头元素通过out参数返回,但不删除该元素。

ConcurrentStack:完全无锁,但面临资源竞争失败时可能会陷入自旋并重试操作。

Push:向栈顶插入元素

TryPop:从栈顶弹出元素,并且通过out 参数返回

TryPeek:返回栈顶元素,但不弹出。

ConcurrentBag:一个无序的集合,程序可以向其中插入元素,或删除元素。在同一个线程中向集合插入,删除元素的效率很高。

Add:向集合中插入元素

TryTake:从集合中取出元素并删除

TryPeek:从集合中取出元素,但不删除该元素。

BlockingCollection:一个支持界限和阻塞的容器

Add :向容器中插入元素

TryTake:从容器中取出元素并删除

TryPeek:从容器中取出元素,但不删除。

CompleteAdding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。

IsCompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。

ConcurrentDictionary对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。

AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。

GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值。
TryAdd:尝试在容器中添加新的键和值。

TryGetValue:尝试根据指定的键获得值。

TryRemove:尝试删除指定的键。

TryUpdate:有条件的更新当前键所对应的值。

GetEnumerator:返回一个能够遍历整个容器的枚举器。

结语

根据ConcurrentBag编写线程安全的生产者消费者请戳:这里 。

说实在的写这篇文章挺烦的,主要涉及的知识点太多讲的太细篇幅会很长况且我自己有些也还没用过,所以是概述性文章,对PFX有个基本的认识,当需要具体深入使用某些知识时再查询相关文档。

关于 并发编程(Concurrent programming)更新到这里基本已经完结,谢谢大家的支持

因个人的兴趣,所以准备沉淀下来专攻 数据结构和算法,然后研究 人工智能(Microsoft的人工智能平台Windows ML不会涉及,选择研究Google的第二代人工智能学习系统TensorFlow )。

接下来会对LinuxPython进行基础的学习并更新文章。

但是最核心的还是数据结构&算法使用那种编程语言并不重要

感兴趣的朋友可以关注。

参考文献

CLR via C#(第4版) Jeffrey Richter

C#高级编程(第10版) C# 6 & .NET Core 1.0   Christian Nagel

果壳中的C# C#5.0权威指南  Joseph Albahari

...

原文地址:https://www.cnblogs.com/jonins/p/9558276.html

时间: 2024-11-07 05:26:25

并行编程(Parallel Framework)的相关文章

C#并行编程-Parallel

原文:C#并行编程-Parallel 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. TPL中引入了一个新命名空间System.Threading.Tasks,在该命名空间下Task是主类,表示一个类的异步的并发的操作,创建并行代码的时候不一定要直接使用Task类,在某些情况下可以直接使用Parallel静态类(System.Threading.Tasks.Parallel)下所提供的方法,而不用底层的Task实例. Parallel.Invoke  试图将很多方

Parallel并行编程

Parallel并行编程 Parallel并行编程可以让我们使用极致的使用CPU.并行编程与多线程编程不同,多线程编程无论怎样开启线程,也是在同一个CPU上切换时间片.而并行编程则是多CPU核心同时工作.耗时的CPU计算操作选择并行是明智的.通常情况,每个CPU核心代表一个硬件线程,但超线程技术,可以使一个cpu核心具有两个硬件线程.软件线程顾名思义就是我们在程序中所开启的. 下面看一个最基础的并行编程的例子,也足以体现多核心并行运行的好处,当然微软.NET为我们封装后,我们也不必过多关注底层操

C#并行编程 z

目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制

C#并行编程中的Parallel.Invoke

一.基础知识 并行编程:并行编程是指软件开发的代码,它能在同一时间执行多个计算任务,提高执行效率和性能一种编程方式,属于多线程编程范畴.所以我们在设计过程中一般会将很多任务划分成若干个互相独立子任务,这些任务不考虑互相的依赖和顺序.这样我们就可以使用很好的使用并行编程.但是我们都知道多核处理器的并行设计使用共享内存,如果没有考虑并发问题,就会有很多异常和达不到我们预期的效果.不过还好NET Framework4.0引入了Task Parallel Library(TPL)实现了基于任务设计而不用

C#并行编程--命令式数据并行(Parallel.Invoke)

命令式数据并行   Visual C# 2010和.NETFramework4.0提供了很多令人激动的新特性,这些特性是为应对多核处理器和多处理器的复杂性设计的.然而,因为他们包括了完整的新的特性,开发人员和架构师必须学习一种新的编程模型. 这一章是一些新的类.结构体和枚举类型,你可以使用这里来处理数据并行的场景.这章将为你展示怎样创建并行代码和描述与每个场景相关的新概念,而不是关注并发编程中的最复杂的问题.这样你将可以更加充分的理解性能改进. 开始并行任务  使用先前版本的.NET Frame

Parallel并行编程初步

Parallel并行编程可以让我们使用极致的使用CPU.并行编程与多线程编程不同,多线程编程无论怎样开启线程,也是在同一个CPU上切换时间片.而并行编程则是多CPU核心同时工作.耗时的CPU计算操作选择并行是明智的.通常情况,每个CPU核心代表一个硬件线程,但超线程技术,可以使一个cpu核心具有两个硬件线程.软件线程顾名思义就是我们在程序中所开启的. 下面看一个最基础的并行编程的例子,也足以体现多核心并行运行的好处,当然微软.NET为我们封装后,我们也不必过多关注底层操作,那我们就看一下运行结果

Net并行编程高级教程--Parallel

Net并行编程高级教程--Parallel 一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够

【读书笔记】.Net并行编程高级教程--Parallel

一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行.硬件线程也称为逻辑内核,一个物

C# 并行编程 之 命令式任务并行 (.Net Framework 4.0)

此文为个人学习<C#并行编程高级教程>的笔记,总结并调试了一些文章中的代码示例. 在以后开发过程中可以加以运用. 最基本的使用,并行任务的创建 在 .Net Framework 4 中出现了Task 的概念.在以往的多线程程序中虽然使用的是thread,但大多数的时候我们还是会把业务处理划分为Task.这样更接近于人类的思考方式,毕竟Thread不能说明它和业务的关联.这里C#直接提供了task,也算是对开发者简便了一些. Task 的使用非常简单,定义工作函数,创建Task,Task开始运行