在.net 4.0中,微软给我们提供了一个新的命名空间:System.Threading.Tasks。
一: Parallel的使用
在Parallel下面有三个常用的方法invoke,for和forEach。
1: Parallel.Invoke
这是最简单,最简洁的将串行的代码并行化
class Program { static void Main(string[] args) { var watch = Stopwatch.StartNew(); watch.Start(); Run1(); Run2(); Console.WriteLine("我是串行开发,总共耗时:{0}\n", watch.ElapsedMilliseconds); watch.Restart(); Parallel.Invoke(Run1, Run2); watch.Stop(); Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds); Console.Read(); } static void Run1() { Console.WriteLine("我是任务一,我跑了3s"); Thread.Sleep(3000); } static void Run2() { Console.WriteLine("我是任务二,我跑了5s"); Thread.Sleep(5000); } }
在这个例子中可以获取点信息:
尽可能的避免子任务之间的依赖性,因为子任务是并行执行,所以就没有谁一定在前,谁一定在后的规定了。
2:Parallel.for
我们知道串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.for它会在底层根据硬件线程的运行状况来充分的使用所有的可
利用的硬件线程,注意这里的Parallel.for的步行是1。这里我们来演示一下,向一个线程安全的集合插入数据,当然这个集合采用原子性来实现线程同步,比那些重量级的锁机制更加的节省消耗。
class Program { static void Main(string[] args) { for (int j = 1; j < 4; j++) { Console.WriteLine("\n第{0}次比较", j); ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); for (int i = 0; i < 20000000; i++) { bag.Add(i); } Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); bag = new ConcurrentBag<int>(); watch = Stopwatch.StartNew(); watch.Start(); Parallel.For(0, 20000000, i => { bag.Add(i); }); Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); } } }
3:Parallel.forEach
forEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。
class Program { static void Main(string[] args) { for (int j = 1; j < 4; j++) { Console.WriteLine("\n第{0}次比较", j); ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); for (int i = 0; i < 3000000; i++) { bag.Add(i); } Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); bag = new ConcurrentBag<int>(); watch = Stopwatch.StartNew(); watch.Start(); Parallel.ForEach(Partitioner.Create(0, 3000000), i => { for (int m = i.Item1; m < i.Item2; m++) { bag.Add(m); } }); Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); } } }
这里还是要说一下:Partitioner.Create(0, 3000000)。
第一:我们要分区的范围是0-3000000。
第二:我们肯定想知道系统给我们分了几个区? 很遗憾,这是系统内部协调的,无权告诉我们,当然系统也不反对我们自己指定分区个数,
这里可以使用Partitioner.Create的第六个重载,比如这样:Partitioner.Create(0, 3000000, Environment.ProcessorCount),
下面分享下并行计算中我们可能有的疑惑?
<1> 如何中途退出并行循环?
是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出
下面举个例子,当迭代到1000的时候退出循环
class Program { static void Main(string[] args) { var watch = Stopwatch.StartNew(); watch.Start(); ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 20000000, (i, state) => { if (bag.Count == 1000) { state.Break(); return; } bag.Add(i); }); Console.WriteLine("当前集合有{0}个元素。", bag.Count); } }
<2> 并行计算中抛出异常怎么处理?(新手在编写并行程序的时候容易犯的错误)
首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。
<3> 并行计算中我可以留一个硬件线程出来吗?
默认的情况下,底层机制会尽可能多的使用硬件线程,然而我们使用手动指定的好处是我们可以在2,4,8个硬件线程的情况下来进行测量加速比
class Program { static void Main(string[] args) { var bag = new ConcurrentBag<int>(); ParallelOptions options = new ParallelOptions(); //指定使用的硬件线程数为1 options.MaxDegreeOfParallelism = 1; Parallel.For(0, 300000, options, i => { bag.Add(i); }); Console.WriteLine("并行计算:集合有:{0}", bag.Count); } }