并行编程之数据并行

本文来自:http://www.cnblogs.com/luminji/archive/2010/12/02/1894548.html

任务并行库 (TPL) 是 .NET Framework 4 版的 System.Threading 和 System.Threading.Tasks 命名空间中的一组公共类型和 API。System.Threadings.Tasks 命名空间提供可简化并发和异步代码编写工作的类型。主要类型为 Task(表示可以等待和取消的异步操作)和 Task<(Of <(TResult>)>)(可以返回值的任务)。Factory 类提供用于创建和启动任务的静态方法,TaskScheduler 类提供默认线程调度基础结构。

TPL 的目的在于简化向应用程序中添加并行性和并发性的过程,从而提高开发人员的工作效率。 TPL 会动态地按比例调节并发程度,以便最有效地使用所有可用的处理器。此外,TPL 还处理工作分区、ThreadPool 上的线程调度、取消支持、状态管理以及其他低级别的细节操作。通过使用 TPL,您可以在将精力集中于程序要完成的工作,同时最大程度地提高代码的性能。

从 .NET Framework 4 开始,TPL 是编写多线程代码和并行代码的首选方法。例如,如果某个循环在每次迭代时只执行少量工作,或它在很多次迭代时都不运行,那么并行化的开销可能导致代码运行更慢。

1:简单例子

数据并行是指对源集合或数组中的元素同时(即并行)执行相同操作的情况。数据并行是指对源集合或数组中的元素同时(即并行)执行相同操作的情况。System.Threading.Tasks..::.Parallel 类中 For 和 ForEach 方法的若干重载支持使用强制性语法的数据并行。在数据并行操作中,将对源集合进行分区,以便多个线程能够同时对不同的片段进行操作。TPL 支持通过 System.Threading.Tasks..::.Parallel 类实现的数据并行。此类提供 for 和 foreach 循环(Visual Basic 中为 For 和 For Each)基于方法的并行实现。为 ParallelFor()()() 或 ParallelForEach()()() 循环编写循环逻辑与编写顺序循环非常类似。您不必创建线程或队列工作项。在基本的循环中,您不必采用锁。TPL 将为您处理所有低级别工作。下面的代码示例演示一个简单的 foreach 循环及其并行等效项。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

public void Method1()

{

    List<string> sourceCollection = new List<string>() { "111", "222", "333" };

    // Sequential version           

    foreach (var item in sourceCollection)

    {

        Process(item);

    }

    // Parallel equivalent

    Parallel.ForEach(sourceCollection, item => Process(item));

}

private void Process(string item)

{

    item += " done";

}

当并行循环运行时,TPL 将对数据源进行分区,以便循环能够同时对多个部分进行操作。在后台,任务计划程序将根据系统资源和工作负荷来对任务进行分区。如有可能,计划程序会在工作负荷变得不平衡的情况下在多个线程和处理器之间重新分配工作。

ParallelFor()()() 和 ParallelForEach()()() 方法都有若干重载,利用这些重载可以停止或中断循环执行、监视其他线程上循环的状态、维护线程本地状态、完成线程本地对象、控制并发程度,等等。启用此功能的帮助器类型包括 ParallelLoopState、ParallelOptions 以及 ParallelLoopResult、CancellationToken 和 CancellationTokenSource。

2:编写具有线程本地变量的 Parallel.For 循环

此示例演示如何使用线程本地变量来存储和检索由 For 循环创建的每个单独任务中的状态。通过使用线程本地数据,您可以避免将大量的访问同步为共享状态的开销。在任务的所有迭代完成之前,您将计算和存储值,而不是写入每个迭代上的共享资源。然后,您可以将最终结果一次性写入共享资源,或将其传递到另一个方法。


1

2

3

4

5

6

7

8

9

10

11

//int[] nums = Enumerable.Range(0, 1000000).ToArray();

int[] nums = { 5, 6 };

long total = 0;

Parallel.For<long>(0, nums.Length, () => 1, (i, loopState, subtotal) =>

{

    subtotal += nums[i];

    return subtotal;

},

    (x) => Interlocked.Add(ref total, x)

);

MessageBox.Show(total.ToString());

我们来看Paralle.For方法中各个参数表示的意思:

(其中<long>是指返回值类型)


参数


意义


0


开始索引


nums.Length


结束索引(不含)


() => 1


用户返回每个线程的本地数据初始状态的函数委托。在这里是将total初始化为1


(i, loop, subtotal) =>

{

subtotal += nums[i];

return subtotal;

}


将为每次迭代调用一次的委托,其中

i:循环变量,自动加1

loopState:为ParallelLoopState

Subtotal:返回值,为long


(x) => Interlocked.Add(ref total, x)


对于每个线程的本地状态执行一个最终状态的委托。在这里是计算了total

3:编写具有线程本地变量的 Parallel.ForEach 循环

下面的代码演示如何编写使用线程本地变量的 ForEach 方法。当 ForEach 循环执行时,它会将其源集合划分为多个分区。每个分区都将获得自己的"线程本地"变量的副本。(术语"线程本地"在此处不太准确,因为在某些情况下两个分区可能在同一线程上运行。)

若要在 ForEach 循环中使用线程本地变量,您必须使用采用两个 type 参数的方法版本。第一个参数指定源元素的类型,第二个参数指定线程本地变量的类型。


1

2

3

4

5

6

7

8

9

10

11

//int[] nums = Enumerable.Range(0, 1000000).ToArray();

int[] nums = { 5, 6 };

long total = 0;

Parallel.ForEach<int, long>(nums, () => 1, (i, loopState, subtotal) =>

{

    subtotal += nums[i];

    return subtotal;

},

    (x) => Interlocked.Add(ref total, x)

);

MessageBox.Show(total.ToString());

我们来看Paralle.ForEach方法中各个参数表示的意思:

(其中<int, long>中int是指数据源类型,long是指返回值类型)


参数


意义


nums


数据源数组


() => 1


用户返回每个线程的本地数据初始状态的函数委托。在这里是将total初始化为1


(i, loopState, subtotal) =>

{

subtotal += nums[i];

return subtotal;

}


将为每次迭代调用一次的委托,其中

i:循环变量,自动加1

loopState:为ParallelLoopState

Subtotal:返回值,为long


(x) => Interlocked.Add(ref total, x)


对于每个线程的本地状态执行一个最终状态的委托。在这里是计算了total

4:取消 Parallel.For 或 ForEach Loop


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

int[] nums = Enumerable.Range(0, 10000000).ToArray();

CancellationTokenSource cts = new CancellationTokenSource();

ParallelOptions po = new ParallelOptions();

po.CancellationToken = cts.Token;

po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

Task.Factory.StartNew(() =>

{

    if (Console.ReadKey().KeyChar == ‘c‘)

        cts.Cancel();

    Console.WriteLine("press any key to exit");

    Console.ReadKey();

});

try

{

    Parallel.ForEach(nums, po, (num) =>

    {

        double d = Math.Sqrt(num);

        Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);

        po.CancellationToken.ThrowIfCancellationRequested();

    });

}

catch (OperationCanceledException e)

{

    Console.WriteLine(e.Message);

}

5:停止或中断 Parallel.For 循环

"中断"表示完成当前线程上当前迭代之前的所有线程上的所有迭代,然后退出循环。""停止"表示在方便的情况下尽快停止所有迭代。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

namespace StopOrBreak

{

    using System;

    using System.Collections.Concurrent;

    using System.Linq;

    using System.Threading;

    using System.Threading.Tasks;

    class Test

    {

        static void Main()

        {

            StopLoop();

            BreakAtThreshold();

            Console.WriteLine("Press any key to exit.");

            Console.ReadKey();

        }

        private static void StopLoop()

        {

            Console.WriteLine("Stop loop...");

            double[] source = MakeDemoSource(1000, 1);

            ConcurrentStack<double> results = new ConcurrentStack<double>();

            // i is the iteration variable. loopState is a

            // compiler-generated ParallelLoopState

            Parallel.For(0, source.Length, (i, loopState) =>

            {

                // Take the first 100 values that are retrieved

                // from anywhere in the source.

                if (i < 100)

                {

                    // Accessing shared object on each iteration

                    // is not efficient. See remarks.

                    double d = Compute(source[i]);

                    results.Push(d);

                }

                else

                {

                    loopState.Stop();

                    return;

                }

            } // Close lambda expression.

            ); // Close Parallel.For

            Console.WriteLine("Results contains {0} elements", results.Count());

        }

        static void BreakAtThreshold()

        {

            double[] source = MakeDemoSource(10000, 1.0002);

            ConcurrentStack<double> results = new ConcurrentStack<double>();

            // Store all values below a specified threshold.

            Parallel.For(0, source.Length, (i, loopState) =>

            {

                double d = Compute(source[i]);

                results.Push(d);

                if (d > .2)

                {

                    // Might be called more than once!

                    loopState.Break();

                    Console.WriteLine("Break called at iteration {0}. d = {1} ", i, d);

                    Thread.Sleep(1000);

                }

            });

            Console.WriteLine("results contains {0} elements", results.Count());

        }

        static double Compute(double d)

        {

            //Make the processor work just a little bit.

            return Math.Sqrt(d);

        }

        // Create a contrived array of monotonically increasing

        // values for demonstration purposes.

        static double[] MakeDemoSource(int size, double valToFind)

        {

            double[] result = new double[size];

            double initialval = .01;

            for (int i = 0; i < size; i++)

            {

                initialval *= valToFind;

                result[i] = initialval;

            }

            return result;

        }

    }

}

ParallelFor()()() 或 [Overload:System.Threading.Tasks.Parallel.Parallel.ForEach`1] 循环中,不能使用与顺序循环中相同的 break 或 Exit 语句,这是因为这些语言构造对于循环是有效的,而并行"循环"实际上是方法,不是循环。相反,可以使用 Stop 或 Break 方法。Parallel..::.For 的一些重载接受 Action<int, ParallelLoopState>(在 Visual Basic 中为 Action(Of Integer, ParallelLoopState))作为输入参数。ParallelLoopState 对象由运行时在后台创建,您可以在lambda 表达式中为它指定您喜欢的任何名称。

在下面的示例中,该方法只需要源序列中的 100 个值,检索出哪些元素并不重要。在此案例中,使用 Stop 方法,因为它将告知循环的所有迭代(包括那些在其他线程上的当前迭代之前开始的迭代)在方便的情况下尽快停止。

在第二个方法中,将检索所有元素,直到源序列中指定的索引。在此案例中,调用 Break,这是因为当到达一个线程上的索引时,源中前面的元素有可能尚未处理。中断会导致其他线程放弃对后续片段的工作(如果它们正忙于任何这样的工作),并在退出循环之前处理完所有以前的元素。

在调用 Stop 或 Break 后,循环中的其他线程可能会继续运行一段时间(这不受应用程序开发人员的控制),理解这一点很重要。可以使用 ParallelLoopState..::.IsStopped属性检查是否已在另一个线程上停止该循环。在下面的示例中,如果 IsStopped 为 true,则不会再有数据写入到集合中。

6:并行中的异常处理


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

private void temp2_Click(object sender, RoutedEventArgs e)

{

    byte[] data = new byte[5000];

    Random r = new Random();

    r.NextBytes(data);

    try

    {

        ProcessDataInParallel(data);

    }

    catch (AggregateException ae)

    {

        foreach (var ex in ae.InnerExceptions)

        {

            if (ex is ArgumentException)

                Console.WriteLine(ex.Message);

            else

                throw ex;

        }

    }

}

private static void ProcessDataInParallel(byte[] data)

{

    var exceptions = new ConcurrentQueue<Exception>();

    Parallel.ForEach(data, d =>

    {

        try

        {

            if (d < 0x3)

                throw new ArgumentException(String.Format("value is {0:x}. Elements must be greater than 0x3.", d));

            else

                Console.Write(d + " ");

        }

        catch (Exception e) { exceptions.Enqueue(e); }

    });

    if (exceptions.Count > 0) throw new AggregateException(exceptions);

}

时间: 2024-07-31 10:22:56

并行编程之数据并行的相关文章

C#并行编程--命令式数据并行(Parallel.Invoke)---与匿名函数一起理解(转载整理)

命令式数据并行   Visual C# 2010和.NETFramework4.0提供了很多令人激动的新特性,这些特性是为应对多核处理器和多处理器的复杂性设计的.然而,因为他们包括了完整的新的特性,开发人员和架构师必须学习一种新的编程模型. 这一章是一些新的类.结构体和枚举类型,你可以使用这里来处理数据并行的场景.这章将为你展示怎样创建并行代码和描述与每个场景相关的新概念,而不是关注并发编程中的最复杂的问题.这样你将可以更加充分的理解性能改进. 开始并行任务  使用先前版本的.NET Frame

C#并行编程--命令式数据并行(Parallel.Invoke)

命令式数据并行   Visual C# 2010和.NETFramework4.0提供了很多令人激动的新特性,这些特性是为应对多核处理器和多处理器的复杂性设计的.然而,因为他们包括了完整的新的特性,开发人员和架构师必须学习一种新的编程模型. 这一章是一些新的类.结构体和枚举类型,你可以使用这里来处理数据并行的场景.这章将为你展示怎样创建并行代码和描述与每个场景相关的新概念,而不是关注并发编程中的最复杂的问题.这样你将可以更加充分的理解性能改进. 开始并行任务  使用先前版本的.NET Frame

五 浅谈CPU 并行编程和 GPU 并行编程的区别

前言 CPU 的并行编程技术,也是高性能计算中的热点,也是今后要努力学习的方向.那么它和 GPU 并行编程有何区别呢? 本文将做出详细的对比,分析各自的特点,为将来深入学习 CPU 并行编程技术打下铺垫. 区别一:缓存管理方式的不同 GPU:缓存对程序员不透明,程序员可根据实际情况操纵大部分缓存 (也有一部分缓存是由硬件自行管理). CPU:缓存对程序员透明.应用程序员无法通过编程手段操纵缓存. 区别二:指令模型的不同 GPU:采用 SIMT - 单指令多线程模型,一条指令配备一组硬件,对应32

如何进行并行编程:从并行矩阵运算开始

并行计算,就是多个进程并行协作,完成特定的任务.现在我们假定一个并行系统,包含了p个处理机,每个处理机一个进程,我们可以分别用字符“0”,“1”,...,“p-1”来引用它们,或者为了清晰,我们用 Pi 来引用它们,i 表示一个进程的进程号,进程之间可以相互传递消息,所谓消息,指的是一个数据结构. 在并行编程中,我们用程序代码定义好一个过程,每个进程都将运行这段程序代码定义的过程,也就是说,代码必须是通用的.接下来我们用并行矩阵计算的实例来说明. 矩阵计算 矩阵计算问题有很多种类型,例如: 求解

C#并行编程

一.基于任务的程序设计 共享内存多核OS和分布式内存OS 共享内存多核OS-一个微处理器由多个内核组成,且每个内核共享一段私有内存: 分布式内存OS-- 由多个微处理器组成,每个微处理器可以有自己的私有内存,微处理器可以位于不同的计算机上,每个计算机可以有不同的通信信道 消息传递接口(MPI):运行在分布式内存计算机系统上的并行应用程序所使用的最流行的通信协议: 并行程序设计和多核程序设计 并行程序是指同一时刻运行多条指令,编写的代码能够充分利用底层硬件提供的并行执行能力:多核程序设计能够充分利

C#并行编程 z

目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制

Net并行编程高级教程--Parallel

Net并行编程高级教程--Parallel 一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够

理解并行编程

并行编程从业务实现的角度可分为数据并行与任务并行,也就是要解决的问题是以数据为核心还是以要处理的事情为核心.基于任务的并行编程模型TPL(任务并行库)是从业务角度实现的并行模型,它以System.Threading.Tasks命名空间下的Parallel类为实现核心类,优点是不需要我们考虑不同的硬件差异,只需要重点关注所实现的任务. 1.任务并行库TPL TPL主要包括数据并行和任务并行,无论是数据并行还是任务并行,都可以使用并行查询PLINQ提高数据查询的效率.数据并行是对数据集合中的元素同时

【读书笔记】.Net并行编程高级教程--Parallel

一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行.硬件线程也称为逻辑内核,一个物