理解并行编程

  并行编程从业务实现的角度可分为数据并行与任务并行,也就是要解决的问题是以数据为核心还是以要处理的事情为核心。基于任务的并行编程模型TPL(任务并行库)是从业务角度实现的并行模型,它以System.Threading.Tasks命名空间下的Parallel类为实现核心类,优点是不需要我们考虑不同的硬件差异,只需要重点关注所实现的任务。

1.任务并行库TPL

  TPL主要包括数据并行和任务并行,无论是数据并行还是任务并行,都可以使用并行查询PLINQ提高数据查询的效率。数据并行是对数据集合中的元素同时执行相同的操作,实现方式主要是利用Parallel.For或Parallel.ForEach。任务并行是采用Parallel.Invoke方法来实现。并行查询是指并行实现LINQ查询,它与LINQ的主要区别就在于并行,它会利用多处理器去去同时执行查询,查询前会先对数据源进行分区,之后每个处理器开始执行其中一个小片段,这样就加快了查询的速度。

  TPL主要有以下优点,TPL编程模型使用CLR线程池执行多个任务,并能自动处理工作分区、线程调度和取消、状态管理以及其他低级别的细节操作;TPL更加智能性,它可以通过试探法来预判任务集并行运行是否有性能优势,当结果是没有性能优势时还会自动选择顺序运行;TPL还会动态地按比例调节并发程度,从而最有效地使用所有可用的处理器。对TPL进行一个总结就是,它让编程人员不必考虑多处理器如何并行工作以及合理调节等底层实现,只需要考虑业务逻辑就可以了。

  上面提到了使用数据并行时或PLINQ时,需要对数据源进行分区。主要实现类为System.Collections.Concurrent命名空间中提供的Partitioner类来实现,通过这个类来将数据源分成许多小片段。对于分区主要有4种方式:按范围分区,适用于已经知道长度的数据源,当多个线程并行处理数据片段时,每个线程接受唯一的开始和结束索引,并行处理时不会覆盖其他线程或被其他线程覆盖,这种分区的缺点是如果一个线程已执行完,它无法帮助那些未完成的线程;对于长度未知的集合则使用按区块分区,线程执行时会并行循环执行区块中一定数量的数据,执行完再检索其他区块。分区程序可确保分发所有元素,并且没有重复项,但是每次线程获取另一个区块时,这时会产生同步开销。一般情况下按区块分区比按范围分区速度快;TPL还支持动态数量的分区,即可以随时创建分区;编程人员也可以自定义分区程序,可从System.Collections.Concurrent.Partitioner<TSource>派生并重写虚方法。

2.Parallel.For和Parallel.ForEach

  在数据并行中,For和ForEach方法会自动对源集合进行分区,这样多个线程才可以并行执行。对这2个方法的理解可以是在for和foreach的基础上加了并行,要注意在多重循环中一般只对外部循环进行并行化,当内部循环执行的工作用时较少时使用内部并行会降低整体运行的性能。对于ForEach方法有我敲的2个小例子,一个是简单ForEach方法的使用,还有一个是按范围分区ForEach方法的使用。以下是ForEach的代码:

   int n = 10;
        private async void button1_Click(object sender, EventArgs e)
        {
            int[] a = Enumerable.Range(1, n).ToArray();
            await ParaGetNumAsync(n, a);
        }
        async Task ParaGetNumAsync(int n, int[] a)
        {
            await Task.Delay(0);
            ConcurrentBag<double> cb = new ConcurrentBag<double>();
            Stopwatch sw = Stopwatch.StartNew();
            Parallel.ForEach(a, (v) => {
                cb.Add(v * v);
                Thread.Sleep(TimeSpan.FromMilliseconds(100));
            });
            sw.Stop();
            double[] b = cb.ToArray();
            Array.Sort(b);
            AddInfo("用时:" + sw.ElapsedMilliseconds);
        }
        void AddInfo(string s)
        {
            listBox1.Items.Add(s);
        }

        private void button2_Click(object sender, EventArgs e)
        {
            var source = Enumerable.Range(0, 10).ToArray();
            var rangePartitioner = Partitioner.Create(0, source.Length);
            double[] results = new double[source.Length];
            Stopwatch sw = Stopwatch.StartNew();
            Parallel.ForEach(rangePartitioner, (range, loopState) => {
                for (int i = range.Item1; i < range.Item2; i++)
                    results[i] = source[i] * source[i];
            });
            sw.Stop();
            AddInfo("所用时间:"+sw.ElapsedMilliseconds);
            sw.Restart();
            Parallel.ForEach(source, (i) => {
                results[i] = source[i] * source[i];
            });
            sw.Stop();
            AddInfo("普通并行所用时间:" + sw.ElapsedMilliseconds);
        }

  关于For一共有4个例子:

(1)当不需要取消或中断迭代,或者不需要保持线程本地状态,此时可采用简单For循环。
(2)指定并行选项以获得最佳性能。
(3)如果希望监视或控制并行循环的状态,可使用带循环状态的For循环。
(4)使用带线程局部变量的For循环。

 //简单For方法
        private void button1_Click(object sender, EventArgs e)
        {
            int[] a = Enumerable.Range(1, n).ToArray();
            int[] b = Enumerable.Range(1, n).ToArray();
            int[] c=new int[n];
            Action<int> action = (i) => { c[i] = a[i] + b[i]; };
            Stopwatch sw = Stopwatch.StartNew();
            Parallel.For(0, n, action);
            sw.Stop();
            listBox1.Items.Add("并行用时:" + sw.ElapsedMilliseconds);
            Stopwatch sw1 = Stopwatch.StartNew();
            for (int i = 0; i < n; i++)
            {
                c[i] = a[i]- b[i];
            }
            //Thread.Sleep(100);
            sw1.Stop();
            int ii = Convert.ToInt32(sw1.ElapsedMilliseconds);
            ii = ii + 10;
            listBox1.Items.Add("非并行用时:" + ii);
        }
        //带并行选项For循环
        private void button2_Click(object sender, EventArgs e)
        {
            Stopwatch sw = new Stopwatch();
            Action<int> action1 = NewAction();
            sw.Restart();
            Parallel.For(0, n, action1);
            sw.Stop();
            listBox1.Items.Add("并行用时:" + sw.ElapsedMilliseconds);
            Action<int> action2 = NewAction();
            ParallelOptions option = new ParallelOptions();
            option.MaxDegreeOfParallelism = 4 * Environment.ProcessorCount;
            sw.Restart();
            Parallel.For(0, n, option, action2);
            sw.Stop();
            listBox1.Items.Add("带并行用时:" + sw.ElapsedMilliseconds);

        }
        //并行循环状态的For方法
        private void button3_Click(object sender, EventArgs e)
        {
            Stopwatch sw = Stopwatch.StartNew();
            ConcurrentBag<Data> cb = new ConcurrentBag<Data>();
            Action<int, ParallelLoopState> action = (i, loopState) =>
            {
                Data data = new Data() { Name="A"+i.ToString(),Number=i};
                cb.Add(data);
                if (i == 10000)
                    loopState.Break();
            };
            var result = Parallel.For(0, n, action);
            sw.Stop();
            listBox1.Items.Add(sw.ElapsedMilliseconds);
        }
        //线程局部变量的For循环
        private async void button4_Click(object sender, EventArgs e)
        {
            await ParaSumAsync(n);
            await SumAsync(n);
            listBox1.Items.Add("执行完毕");
        }

        async Task ParaSumAsync(int n)
        {
            await Task.Delay(0);
            int total = 0;
            int[] test = new int[20];
            var cb =  new ConcurrentBag<Data>();
            Func<int> subInit=()=>0;

            Func<int, ParallelLoopState, int, int> body = (i, loopState, subTotal) =>
            {
                Data data = new Data() { Name = i.ToString(), Number = i };
                subTotal += data.Number;
                data.Name = "T" + subTotal;
                cb.Add(data);
                //模拟每次循环至少用时100ms以上
                Thread.Sleep(TimeSpan.FromMilliseconds(100));
                return subTotal;
            };
            Action<int> action = (subTotal) =>
            {
                //由于total是全局变量,因此需要通过原子操作解决资源争用问题
                total = Interlocked.Add(ref total, subTotal);
            };
            Stopwatch sw = Stopwatch.StartNew();
            Parallel.For(0, n, subInit, body, action);
            sw.Stop();
            string s="";
            string strName="";
            foreach (var v in cb)
            {
                s += v.Number.ToString() + ",";
                strName += v.Name + " ";
            }
            AddInfo("每个对象的数:"+s.TrimEnd(‘,‘));
            AddInfo(strName);
            AddInfo("并行用时:"+sw.ElapsedMilliseconds+"    结果:"+total);
        }
        private Action<int> NewAction()
        {
            var cb = new ConcurrentBag<Data>();
            Action<int> action = (i) => { cb.Add(new Data() { Name = "A" + i.ToString(), Number = i }); };
            return action;
        }
        async Task SumAsync(int n)
        {
            int sum = 0;
            string s = "";
            var list = new List<Data>();
            Stopwatch sw = Stopwatch.StartNew();
            for (int i = 0; i < n; i++)
            {
                Data data = new Data() { Name = "A" + i.ToString(), Number = i };
                list.Add(data);
                sum += list[i].Number;
                s += list[i].Number.ToString() + ",";
                await Task.Delay(TimeSpan.FromMilliseconds(100));
            }
            sw.Stop();
            AddInfo("每个对象中的数: " + s.TrimEnd(‘,‘));
            AddInfo("非并行用时:" + sw.ElapsedMilliseconds+"    结果:"+sum);
        }
        void AddInfo(string s)
        {
            listBox1.Items.Add(s);
        }

3.Parallel.Invoke

  这个方法可以并行执行任务,而且并行的任务都是异步执行的,在练习的例子中写了一个简单的Invoke执行程序。如果不知道任务调度程序,则会使用默认的任务调度程序。默认的调度程序用两种队列对线程池中的线程进行排队,一种是按先进先出进行操作的全局队列,另一种是按后进先出进行操作的本地队列。这种默认调度程序还自动实现了用于负载平衡的工作窃取、用于实现最大吞吐量的线程注入和撤销。每个进程是一个应用程序域,每个应用程序域有一个线程池,线程池有一个全局队列,全局队列一般是存放顶级任务的队列,也就是父级任务,本地队列一般是存放子任务和嵌套任务。由于父任务可能是顶级任务,也可能是另一个任务的子级,当此线程准备好执行更多工作时,它将首先在本地队列中查找,如果有等待的嵌套任务或子任务时,那么这个任务将很快被访问,也就是开始执行。下面是一段简单的代码说明全局队列和本地队列。

  //ta是一个顶级任务,此线程将在线程池全局队列中排队
  Task ta = Task.Factory.StartNew(() => {
  //tb和tc属于嵌套任务,将会在本地队列排队
  Task tb = new Task();
  Task tc = new Task();
  });

  当线程池线程准备好执行更多的工作时,它将首先在其本地队列的开始部分查找,然后依次在全局队列和其他线程的本地队列中查找,如果在另一个线程的本地队列找到工作项将会先使用试探法来判断是否能有效地执行该工作,如果能有效运行,则按后进先出顺序使队列末尾的工作项离队,接着开始执行。这种模式可帮助线程池更有效地平衡工作负载。

  当需要自定义任务调度策略时,可以通过设置TaskCreationOption枚举值自定义调度策略。TaskCreationOption枚举表示任务创建和执行任务时的可选行为,利用它可自定义任务调度策略,具体的枚举值可以查到,不过只有当可以保证自定义的任务调度策略执行效率高于默认的任务调度策略时,才使用TaskCreationOption。如果需要长时间执行某个任务时,并有可能阻塞本地队列中的所有其他任务,这时可以指定LongRunning选项来告诉调度程序该任务可能需要附加线程,类似于额外创建一个线程,即不再放入全局队列和本地队列。以下是Invoke方法的简单使用:

    CancellationTokenSource cts;
        private void button1_Click(object sender, EventArgs e)
        {
            Action a1 = () => MyMethod("a");
            Action a2 = () => MyMethod("b");
            Action a3 = () => MyMethod("c");
            ParallelOptions options = new ParallelOptions();
            //将界面与当前同步上下文关联起来
            options.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
            cts = new CancellationTokenSource();
            options.CancellationToken = cts.Token;
            Parallel.Invoke(options, a1,a2, a3);
        }
        async void MyMethod(string s)
        {
            while (cts.IsCancellationRequested == false)
            {
                listBox1.Items.Add(s);
                await Task.Delay(100);
            }
        }

  并行执行多个任务时,有时候可能需要某个任务等待另一个任务完成后才能开始执行,这时需要用到任务等待与组合。其中Task.WaitAny方法等待参数中提供的任一个Task对象完成执行过程,只要指定的多个任务中有一个任务完成,就不再等待。最终当被等待的任务执行完毕后,可通过Task.FromResult方法判断执行的结果。以下是WaitAll和WaitAny方法的使用,

        //Task.WaitAll
        private void button5_Click(object sender, EventArgs e)
        {
            Func<object, int> func = (object obj) =>
            {
                int k = (int)obj;
                return k + 1;
            };
            Task<int>[] tasks = new Task<int>[3];
            for (int i = 0; i < tasks.Length; i++)
            {
                tasks[i] = new Task<int>(func, i);
                tasks[i].Start();
            }
            Task.WaitAll(tasks);
            listBox1.Items.Add("所有任务都已经执行完了");
        }
        //Task.WaitAny
        private void button4_Click(object sender, EventArgs e)
        {
            Action act = () =>
            {
               int i=1;
               i++;
            };
            Task task = new Task(act);
            task.Start();
            Task.WaitAny(task);
            listBox1.Items.Add("任务已经执行完了");
        }

时间: 2024-10-08 19:04:14

理解并行编程的相关文章

OpenCL学习笔记(二):并行编程概念理解

欢迎转载,转载请注明:本文出自Bin的专栏blog.csdn.net/xbinworld. 技术交流QQ群:433250724,欢迎对算法.技术.应用感兴趣的同学加入. 并行编程的需求是显而易见的,其最大的难题是找到算法的并行功能,同时必须处理数据的共享和同步.但是,因为每一个算法都是不一样的,很难有通用的并行功能--粒度都有可能是不一样的.OpenCL提供了很多并行的抽象模型,因此算法开发人员可以在不同粒度上开发并行的算法,以及数据的共享和同步. 一般来说,并行编程有两种大类型--分散收集(s

Net并行编程高级教程--Parallel

Net并行编程高级教程--Parallel 一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够

.Net中的并行编程-6.常用优化策略

            本文是.Net中的并行编程第六篇,今天就介绍一些我在实际项目中的一些常用优化策略.      一.避免线程之间共享数据 避免线程之间共享数据主要是因为锁的问题,无论什么粒度的锁,最好的线程之间同步方式就是不加锁,这个地方主要措施就是找出数据之间的哪个地方需要共享数据和不需要共享数据的地方,再设计上避免多线程之间共享数据. 在以前做过的某项目,开始时设计的方案: 开始设计时所有的数据都放入到了公共队列,然后队列通知多个线程去处理数据,队列采用互斥锁保证线程同步,造成的结果就

.Net中的并行编程-2.ConcurrentStack的实现与分析

在上篇文章<.net中的并行编程-1.基础知识>中列出了在.net进行多核或并行编程中需要的基础知识,今天就来分析在基础知识树中一个比较简单常用的并发数据结构--.net类库中无锁栈的实现. 首先解释一下什么这里“无锁”的相关概念. 所谓无锁其实就是在普通栈的实现方式上使用了原子操作,原子操作的原理就是CPU在系统总线上设置一个信号,当其他线程对同一块内存进行访问时CPU监测到该信号存在会,然后当前线程会等待信号释放后才能对内存进行访问.原子操作都是由操作系统API实现底层由硬件支持,常用的操

四 GPU 并行编程的存储系统架构

前言 在用 CUDA 对 GPU 进行并行编程的过程中,除了需要对线程架构要有深刻的认识外,也需要对存储系统架构有深入的了解. 这两个部分是 GPU 编程中最为基础,也是最为重要的部分,需要花时间去理解吸收,加深内功. 了解 GPU 存储系统架构的意义 CUDA 编程架构的设计思路本身也就是让程序员去使用缓存,而不是让缓存像 CPU 编程结构那样对程序员透明. 通过对所使用存储结构的优化,能够让程序的并行后的效果得到很大提高. 因此,这个问题是需要我们在开发全程中考虑的. 第一层:寄存器 每个流

并行编程中的内存回收Hazard Pointer

接上篇使用RCU技术实现读写线程无锁,在没有GC机制的语言中,要实现Lock free的算法,就免不了要自己处理内存回收的问题. Hazard Pointer是另一种处理这个问题的算法,而且相比起来不但简单,功能也很强大.锁无关的数据结构与Hazard指针中讲得很好,Wikipedia Hazard pointer也描述得比较清楚,所以我这里就不讲那么细了. 一个简单的实现可以参考我的github haz_ptr.c 原理 基本原理无非也是读线程对指针进行标识,指针(指向的内存)要释放时都会缓存

【读书笔记】.Net并行编程(三)---并行集合

为了让共享的数组,集合能够被多线程更新,我们现在(.net4.0之后)可以使用并发集合来实现这个功能.而System.Collections和System.Collections.Generic命名空间中所提供的经典列表,集合和数组都不是线程安全的,如果要使用,还需要添加代码来同步. 先看一个例子,通过并行循环向一个List<string>集合添加元素.因为List不是线程安全的,所以必须对Add方法加锁来串行化. 任务开始: private static int NUM_AES_KEYS =

【读书笔记】.Net并行编程高级教程--Parallel

一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行.硬件线程也称为逻辑内核,一个物

C#并行编程-Task

原文:C#并行编程-Task 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 任务简介 TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码. 但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有一对一的关系.) 创建一个新的任务时,调度器(调度器依赖于底层的线程池