概要
PLINQ可以简化对一个序列或一个组中所有成员应用同一个函数的过程,这个过程称之为规约操作。类似Sum()函数就是一个规约操作。PLINQ提供一个可重载Aggregate的接口,这里用户可以定义自己的规约函数。
规约操作是对每一个成员进行的操作,当操作完成后有可能需要将操作结果进行汇总得到一个最终的结果,这个就是聚合的概念。
规约操作
示例中要求计算 1 到 50000000中能被5整除的数除以PI以后得到的平均数。它可以用LINQ完成,也可以用PLINQ完成。
代码示例:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_2_plinq_calculate
{
class Program
{
static int NUM_INTS = 50000000;
static IEnumerable<int> GenerateInputeData()
{
return Enumerable.Range(1, NUM_INTS);
}
static ParallelQuery<int> GenerateInputeData4Parallel()
{
return ParallelEnumerable.Range(1, NUM_INTS);
}
static void Main(string[] args)
{
var seqTarget = GenerateInputeData();
Console.WriteLine("============================================================");
Console.WriteLine("TEST NORMAL LINQ");
Console.WriteLine("============================================================");
var swatchpn = Stopwatch.StartNew();
var seqQuery = (from intNum in seqTarget
where ((intNum % 5) == 0)
select (intNum / Math.PI)).Average();
swatchpn.Stop();
Console.WriteLine("LINQ Result: " + seqQuery + " LINQ Use Time: {0}", swatchpn.Elapsed);
var palTarget = GenerateInputeData4Parallel();
Console.WriteLine("\n\n");
Console.WriteLine("============================================================");
Console.WriteLine("TEST PARALLEL LINQ");
Console.WriteLine("============================================================");
var swatchp = Stopwatch.StartNew();
var palQuery = (from intNum in palTarget.AsParallel()
where ((intNum % 5) == 0)
select (intNum / Math.PI)).Average();
swatchp.Stop();
Console.WriteLine("PLINQ Result: " + palQuery + " LINQ Use Time: {0}", swatchp.Elapsed);
Console.ReadLine();
}
}
}
测试结果:
聚合操作
代码示例会计算一个数组的标准偏差,偏度,和峰度来说明聚合的使用。
顺便补补数学吧:
- 标准偏差:一种量度数据分布的分散程度之标准,用以衡量数据值偏离算术平均值的程度。标准偏差越小,这些值偏离平均值就越少,反之亦然。标准偏差的大小可通过标准偏差与平均值的倍率关系来衡量。
图片公式来自百度百科。
- 偏度:偏度系数是描述分布偏离对称性程度的一个特征数。当分布左右对称时,偏度系数为0。当偏度系数大于0时,即重尾在右侧时,该分布为右偏。当偏度系数小于0时,即重尾在左侧时,该分布左偏。
- 峰度:表示分布相对于正太分布而言是更加高耸还是更加平坦。正值表示相对高耸的分布,负值表示相对平坦的峰度。简单的说,峰度是描述分布形态的陡缓程度。也可以这样理解,在相同的标准差下,峰度系数越大,分布就有更多的极端值,那么其余值必然要更加集中在众数周围,其分布必然就更加陡峭。
关于Aggregate 函数的参数说明参考
https://msdn.microsoft.com/en-us/zh-en/library/dd383667(v=vs.110).aspx
关于参数的简单说明:
- seed:是累加器初始化的值。
- update accumulator function:对数组中每一个值进行运算,PLINQ中由于它是对数据源进行了分区然后并行运算的,这一步产生的结果其实是保存的每一个分区的计算结果。
- combine accumulator function:将每一分区的计算结果进行累加,得到一个总的数组的累加结果。
- result selector:对累加结果进行运算,得到最终的结果,也就是返回值。
示例的重点并不是各种数字运算,而是说明Aggregate() 可以对数据源每一个元素运算后将结果进行汇总再次运算,它可以在一个步骤中完成,省去了分别编写的麻烦。而且它对数据运算时是数据分区,任务并行的。
下面是计算的代码示例:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_2_plink_aggregate
{
class Program
{
static void Main(string[] args)
{
int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30};
var mean = inputInts.AsParallel().Average();
var standarddeviation = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1)))
);
var skewness = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2))
);
var kurtosis = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
(3 * Math.Pow((inputInts.Count() - 2), 2)) /
((inputInts.Count() - 2) * (inputInts.Count() - 3))
);
Console.WriteLine("============================================================");
Console.WriteLine("TEST Parallel LINQ Calculate Result");
Console.WriteLine("============================================================");
Console.WriteLine("Mean : {0}", mean);
Console.WriteLine("Standard Deviaton : {0}", standarddeviation);
Console.WriteLine("Skewness : {0}", skewness);
Console.WriteLine("Kurtosis : {0}", kurtosis);
Console.ReadLine();
}
}
}
并发的PLINQ任务和任务的取消
PLINQ同样也可以和其他形式的并发任务一起使用。例如在计算 标准偏差,偏度和峰度的过程中。
实际的执行顺序是 平均值 => 标准偏差 => 偏度 => 峰度
但根据运算的公式,完全可以把偏度和峰度进行并行化处理的。标准差是他们公共的输入。
平均值 => 标准偏差 => 偏度
=> 峰度
它们完全可以使用ContinueWith操作,如果有超时控制或取消需要的话,可以使用WithCancellation() 接口。
代码示例:
代码中用函数将 PLINQ 的操作又进行了封装,然后用Task的方式进行并行化的调用。deferredCancelTask 是一个捣乱任务,如果把注释打开,在2秒时它会发出一个Cancel信号,取消任务的执行,并且在异常处理时打印任务的状态。
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_4_parallel_task_with_plinq
{
class Program
{
private static ParallelQuery<int> inputInts =
ParallelEnumerable.Range(1, 100000000);
private static double CalculateMean(System.Threading.CancellationToken ct)
{
return inputInts.AsParallel().WithCancellation(ct).Average();
}
private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean)
{
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1)))
);
}
private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation)
{
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2))
);
}
private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation)
{
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
(3 * Math.Pow((inputInts.Count() - 2), 2)) /
((inputInts.Count() - 2) * (inputInts.Count() - 3))
);
}
static void Main(string[] args)
{
Console.WriteLine("============================================================");
Console.WriteLine("TEST Parallel TASK work with PLINQ");
Console.WriteLine("============================================================");
var cts = new System.Threading.CancellationTokenSource();
var ct = cts.Token;
var TaskMean = new Task<double>(()=> CalculateMean(ct), ct);
var TaskSTDev = TaskMean.ContinueWith<double>((t) => { return CalculateStandardDeviation(ct, t.Result); },
TaskContinuationOptions.OnlyOnRanToCompletion);
var TaskSkewness = TaskSTDev.ContinueWith<double>((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); },
TaskContinuationOptions.OnlyOnRanToCompletion);
var TaskKurtosis = TaskSTDev.ContinueWith<double>((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); },
TaskContinuationOptions.OnlyOnRanToCompletion);
//var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();});
try
{
TaskMean.Start();
Task.WaitAll(TaskSkewness, TaskKurtosis);
Console.WriteLine("Mean : {0}", TaskMean.Result);
Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result);
Console.WriteLine("Skewness : {0}", TaskSkewness.Result);
Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result);
}
catch(AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
//Console.WriteLine(ex.ToString());
if (ex is TaskCanceledException)
{
Console.WriteLine("Mean Task: {0}", TaskMean.Status);
Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status);
Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status);
Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status);
}
}
}
Console.ReadLine();
}
}
}