C# 并行编程 之 PLINQ 规约操作和聚合函数

概要

PLINQ可以简化对一个序列或一个组中所有成员应用同一个函数的过程,这个过程称之为规约操作。类似Sum()函数就是一个规约操作。PLINQ提供一个可重载Aggregate的接口,这里用户可以定义自己的规约函数。

规约操作是对每一个成员进行的操作,当操作完成后有可能需要将操作结果进行汇总得到一个最终的结果,这个就是聚合的概念。

规约操作

示例中要求计算 1 到 50000000中能被5整除的数除以PI以后得到的平均数。它可以用LINQ完成,也可以用PLINQ完成。

代码示例:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;

namespace Sample6_2_plinq_calculate
{
    class Program
    {
        static int NUM_INTS = 50000000;

        static IEnumerable<int> GenerateInputeData()
        {
            return Enumerable.Range(1, NUM_INTS);
        }

        static ParallelQuery<int> GenerateInputeData4Parallel()
        {
            return ParallelEnumerable.Range(1, NUM_INTS);
        }

        static void Main(string[] args)
        {
            var seqTarget = GenerateInputeData();

            Console.WriteLine("============================================================");
            Console.WriteLine("TEST NORMAL LINQ");
            Console.WriteLine("============================================================");
            var swatchpn = Stopwatch.StartNew();

            var seqQuery = (from intNum in seqTarget
                            where ((intNum % 5) == 0)
                            select (intNum / Math.PI)).Average();
            swatchpn.Stop();

            Console.WriteLine("LINQ Result: " + seqQuery + "    LINQ Use Time: {0}", swatchpn.Elapsed);

            var palTarget = GenerateInputeData4Parallel();
            Console.WriteLine("\n\n");
            Console.WriteLine("============================================================");
            Console.WriteLine("TEST PARALLEL LINQ");
            Console.WriteLine("============================================================");
            var swatchp = Stopwatch.StartNew();

            var palQuery = (from intNum in palTarget.AsParallel()
                            where ((intNum % 5) == 0)
                            select (intNum / Math.PI)).Average();
            swatchp.Stop();

            Console.WriteLine("PLINQ Result: " + palQuery + "    LINQ Use Time: {0}", swatchp.Elapsed);

            Console.ReadLine();

        }
    }
}

测试结果:

聚合操作

代码示例会计算一个数组的标准偏差,偏度,和峰度来说明聚合的使用。

顺便补补数学吧:

  • 标准偏差:一种量度数据分布的分散程度之标准,用以衡量数据值偏离算术平均值的程度。标准偏差越小,这些值偏离平均值就越少,反之亦然。标准偏差的大小可通过标准偏差与平均值的倍率关系来衡量。

    图片公式来自百度百科。


  • 偏度:偏度系数是描述分布偏离对称性程度的一个特征数。当分布左右对称时,偏度系数为0。当偏度系数大于0时,即重尾在右侧时,该分布为右偏。当偏度系数小于0时,即重尾在左侧时,该分布左偏。


  • 峰度:表示分布相对于正太分布而言是更加高耸还是更加平坦。正值表示相对高耸的分布,负值表示相对平坦的峰度。简单的说,峰度是描述分布形态的陡缓程度。也可以这样理解,在相同的标准差下,峰度系数越大,分布就有更多的极端值,那么其余值必然要更加集中在众数周围,其分布必然就更加陡峭。



关于Aggregate 函数的参数说明参考

https://msdn.microsoft.com/en-us/zh-en/library/dd383667(v=vs.110).aspx

关于参数的简单说明:

  • seed:是累加器初始化的值。
  • update accumulator function:对数组中每一个值进行运算,PLINQ中由于它是对数据源进行了分区然后并行运算的,这一步产生的结果其实是保存的每一个分区的计算结果。
  • combine accumulator function:将每一分区的计算结果进行累加,得到一个总的数组的累加结果。
  • result selector:对累加结果进行运算,得到最终的结果,也就是返回值。

示例的重点并不是各种数字运算,而是说明Aggregate() 可以对数据源每一个元素运算后将结果进行汇总再次运算,它可以在一个步骤中完成,省去了分别编写的麻烦。而且它对数据运算时是数据分区,任务并行的。

下面是计算的代码示例:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;

namespace Sample6_2_plink_aggregate
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30};

            var mean = inputInts.AsParallel().Average();

            var standarddeviation = inputInts.AsParallel().Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1)))
                );

            var skewness = inputInts.AsParallel().Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2))
                );

            var kurtosis = inputInts.AsParallel().Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
                    ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
                    (3 * Math.Pow((inputInts.Count() - 2), 2)) /
                    ((inputInts.Count() - 2) * (inputInts.Count() - 3))
                );

            Console.WriteLine("============================================================");
            Console.WriteLine("TEST Parallel LINQ Calculate Result");
            Console.WriteLine("============================================================");
            Console.WriteLine("Mean : {0}", mean);
            Console.WriteLine("Standard Deviaton : {0}", standarddeviation);
            Console.WriteLine("Skewness : {0}", skewness);
            Console.WriteLine("Kurtosis : {0}", kurtosis);

            Console.ReadLine();
        }
    }
}

并发的PLINQ任务和任务的取消

PLINQ同样也可以和其他形式的并发任务一起使用。例如在计算 标准偏差,偏度和峰度的过程中。

实际的执行顺序是 平均值 => 标准偏差 => 偏度 => 峰度

但根据运算的公式,完全可以把偏度和峰度进行并行化处理的。标准差是他们公共的输入。

平均值 => 标准偏差 => 偏度

=> 峰度

它们完全可以使用ContinueWith操作,如果有超时控制或取消需要的话,可以使用WithCancellation() 接口。

代码示例:

代码中用函数将 PLINQ 的操作又进行了封装,然后用Task的方式进行并行化的调用。deferredCancelTask 是一个捣乱任务,如果把注释打开,在2秒时它会发出一个Cancel信号,取消任务的执行,并且在异常处理时打印任务的状态。

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;

namespace Sample6_4_parallel_task_with_plinq
{
    class Program
    {
        private static ParallelQuery<int> inputInts =
            ParallelEnumerable.Range(1, 100000000);

        private static double CalculateMean(System.Threading.CancellationToken ct)
        {
            return inputInts.AsParallel().WithCancellation(ct).Average();
        }

        private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1)))
                );
        }

        private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2))
                );
        }

        private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
                    ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
                    (3 * Math.Pow((inputInts.Count() - 2), 2)) /
                    ((inputInts.Count() - 2) * (inputInts.Count() - 3))
                );
        }

        static void Main(string[] args)
        {
            Console.WriteLine("============================================================");
            Console.WriteLine("TEST Parallel TASK work with PLINQ");
            Console.WriteLine("============================================================");

            var cts = new System.Threading.CancellationTokenSource();
            var ct = cts.Token;

            var TaskMean = new Task<double>(()=> CalculateMean(ct), ct);
            var TaskSTDev = TaskMean.ContinueWith<double>((t) => { return CalculateStandardDeviation(ct, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);
            var TaskSkewness = TaskSTDev.ContinueWith<double>((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);
            var TaskKurtosis = TaskSTDev.ContinueWith<double>((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);

            //var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();});

            try
            {
                TaskMean.Start();

                Task.WaitAll(TaskSkewness, TaskKurtosis);
                Console.WriteLine("Mean : {0}", TaskMean.Result);
                Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result);
                Console.WriteLine("Skewness : {0}", TaskSkewness.Result);
                Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result);

            }
            catch(AggregateException aex)
            {
                foreach (var ex in aex.InnerExceptions)
                {
                    //Console.WriteLine(ex.ToString());

                    if (ex is TaskCanceledException)
                    {
                        Console.WriteLine("Mean Task: {0}", TaskMean.Status);
                        Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status);
                        Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status);
                        Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status);
                    }
                }
            }

            Console.ReadLine();
        }
    }
}
时间: 2024-10-13 13:36:32

C# 并行编程 之 PLINQ 规约操作和聚合函数的相关文章

C# 并行编程 之 PLINQ 基本使用

PLINQ Summary LINQ (LINQ) Language Integrated Query 可以方便的查询并处理不同数据源的数据.PLINQ Parallel LINQ不光拥有LINQ的功能,还添加了并行操作的接口,以方便使用并提高效率. 更详细的信息: https://msdn.microsoft.com/zh-cn/library/dd460688(v=vs.110).aspx 一个简单的例子 用一个简单的例子足以说明PLINQ的使用. 简单的说明: 第一部分:先在一个文档中查询

C#中的多线程 - 并行编程 z

原文:http://www.albahari.com/threading/part5.aspx 专题:C#中的多线程 1并行编程Permalink 在这一部分,我们讨论 Framework 4.0 加入的多线程 API,它们可以充分利用多核处理器. 并行 LINQ(Parallel LINQ)或称为 PLINQ Parallel类 任务并行(task parallelism)构造 SpinLock 和 SpinWait 这些 API 可以统称为 PFX(Parallel Framework,并行

C#并行编程-PLINQ:声明式数据并行

原文:C#并行编程-PLINQ:声明式数据并行 背景 通过LINQ可以方便的查询并处理不同的数据源,使用Parallel LINQ (PLINQ)来充分获得并行化所带来的优势. PLINQ不仅实现了完整的LINQ操作符,而且还添加了一些用于执行并行的操作符,与对应的LINQ相比,通过PLINQ可以获得明显的加速,但是具体的加速效果还要取决于具体的场景,不过在并行化的情况下一段会加速. 如果一个查询涉及到大量的计算和内存密集型操作,而且顺序并不重要,那么加速会非常明显,然而,如果顺序很重要,那么加

5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq

在上一篇博客5天玩转C#并行和多线程编程 —— 第一天 认识Parallel中,我们学习了Parallel的用法.并行编程,本质上是多线程的编程,那么当多个线程同时处理一个任务的时候,必然会出现资源访问问题,及所谓的线程安全.就像现实中,我们开发项目,就是一个并行的例子,把不同的模块分给不同的人,同时进行,才能在短的时间内做出大的项目.如果大家都只管自己写自己的代码,写完后发现合并不到一起,那么这种并行就没有了意义. 并行算法的出现,随之而产生的也就有了并行集合,及线程安全集合:微软向的也算周到

C#并行编程 z

目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制

理解并行编程

并行编程从业务实现的角度可分为数据并行与任务并行,也就是要解决的问题是以数据为核心还是以要处理的事情为核心.基于任务的并行编程模型TPL(任务并行库)是从业务角度实现的并行模型,它以System.Threading.Tasks命名空间下的Parallel类为实现核心类,优点是不需要我们考虑不同的硬件差异,只需要重点关注所实现的任务. 1.任务并行库TPL TPL主要包括数据并行和任务并行,无论是数据并行还是任务并行,都可以使用并行查询PLINQ提高数据查询的效率.数据并行是对数据集合中的元素同时

.NET并行编程1 -

设计模式--.net并行编程,清华大学出版的中译本. 相关资源地址主页面: http://parallelpatterns.codeplex.com/ 代码下载: http://parallelpatterns.codeplex.com/releases/view/50473 书籍在线地址: https://msdn.microsoft.com/en-us/library/ff963553.aspx 使用并行编程的一些示例: https://code.msdn.microsoft.com/Par

并行计算复习————第四篇 并行计算软件支撑:并行编程

并行计算复习 第四篇 并行计算软件支撑:并行编程 Ch13 并行程序设计基础 13.1并行语言构造方法 库例程:MPI.Pthreads 扩展串行语言:Fortran90 加编译注释构造:OpenMP 13.2并行性问题 可利用SPMD来伪造MPMD 需要运行MPMD:parbegin S1 S2 S3 parend 可以改造成SPMD: for i = 1 to 3 par-do if i == 1 then S1 else if i == 2 then S2 else if i == 3 t

并行编程(Parallel Framework)

前言 并行编程:通过编码方式利用多核或多处理器称为并行编程,多线程概念的一个子集. 并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程.多线程的一种. 并行编程分为如下几个结构: 1.并行的LINQ或PLINQ 2.Parallel类 3.任务并行结构 4.并发集合 5.SpinLock和SpinWait 这些是.NET 4.0引入的功能,一般被称为PFX(Parallel Framework,并行框架). Parallel类和任务并行结构称为TPL(Task Parallel