8天玩转并行开发——第四天 同步机制(上)

在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory

.ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多

的类来帮助我们简化并行计算中复杂的数据同步问题。

大体上分为二种:

①   并发集合类:           这个在先前的文章中也用到了,他们的出现不再让我们过多的关注同步细节。

②  轻量级同步机制:      相对于老版本中那些所谓的重量级同步机制而言,新的机制更加节省cpu的额外开销。

关于并发集合类没什么好讲的,如果大家熟悉非线程安全的集合,那么这些并发的集合对你来说小菜一碟,这一篇和下一篇我们仔细来玩玩这

些轻量级的同步机制。

一:Barrier(屏障同步)

1:基本概念

msdn对它的解释是:使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作。乍一看有点不懂,没关系,我们采取提干法。

”多个任务“,”多个阶段”,“协同”,仔细想想知道了,下一阶段的执行必须等待上一个阶段中多task全部执行完,那么我们实际中有这样

的需求吗?当然有的,比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这

里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。

好,我们知道barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待

后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。

啰嗦了半天,还是上下代码说话:

 1 using System.Collections.Concurrent; 2 using System.Threading.Tasks; 3 using System; 4 using System.Diagnostics; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Threading; 8  9 class Program10 {11 //四个task执行12 static Task[] tasks = new Task[4];13 14 static Barrier barrier = null;15 16 static void Main(string[] args)17 {18 barrier = new Barrier(tasks.Length, (i) =>19 {20 Console.WriteLine("**********************************************************");21 Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);22 Console.WriteLine("**********************************************************");23 });24 25 for (int j = 0; j < tasks.Length; j++)26 {27 tasks[j] = Task.Factory.StartNew((obj) =>28 {29 var single = Convert.ToInt32(obj);30 31 LoadUser(single);32 barrier.SignalAndWait();33 34 LoadProduct(single);35 barrier.SignalAndWait();36 37 LoadOrder(single);38 barrier.SignalAndWait();39 }, j);40 }41 42 Task.WaitAll(tasks);43 44 Console.WriteLine("指定数据库中所有数据已经加载完毕!");45 46 Console.Read();47 }48 49 static void LoadUser(int num)50 {51 Console.WriteLine("当前任务:{0}正在加载User部分数据!", num);52 }53 54 static void LoadProduct(int num)55 {56 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);57 }58 59 static void LoadOrder(int num)60 {61 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);62 }63 }

2:死锁问题

先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过

barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?好,我举个例子。

 1 using System.Collections.Concurrent; 2 using System.Threading.Tasks; 3 using System; 4 using System.Diagnostics; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Threading; 8  9 class Program10 {11 //四个task执行12 static Task[] tasks = new Task[4];13 14 static Barrier barrier = null;15 16 static void Main(string[] args)17 {18 barrier = new Barrier(tasks.Length, (i) =>19 {20 Console.WriteLine("**********************************************************");21 Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);22 Console.WriteLine("**********************************************************");23 });24 25 for (int j = 0; j < tasks.Length; j++)26 {27 tasks[j] = Task.Factory.StartNew((obj) =>28 {29 var single = Convert.ToInt32(obj);30 31 LoadUser(single);32 barrier.SignalAndWait();33 34 LoadProduct(single);35 barrier.SignalAndWait();36 37 LoadOrder(single);38 barrier.SignalAndWait();39 40 }, j);41 }42 43 Task.WaitAll(tasks);44 45 barrier.Dispose();46 47 Console.WriteLine("指定数据库中所有数据已经加载完毕!");48 49 Console.Read();50 }51 52 static void LoadUser(int num)53 {54 Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);55 56 if (num == 0)57 {58 //num=0:表示0号任务59 //barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出60 // SpinWait.SpinUntil: 自旋锁,相当于死循环61 SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0);62 }63 }64 65 static void LoadProduct(int num)66 {67 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);68 }69 70 static void LoadOrder(int num)71 {72 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);73 }74 }

我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中

的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。

结果就是造成了相互等待的尴尬局面,我们下个断点看看情况。

3:超时机制

当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都

没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们

还要采用CancellationToken机制。

  1 using System.Collections.Concurrent;  2 using System.Threading.Tasks;  3 using System;  4 using System.Diagnostics;  5 using System.Collections.Generic;  6 using System.Linq;  7 using System.Threading;  8   9 class Program 10 { 11 //四个task执行 12 static Task[] tasks = new Task[4]; 13  14 static Barrier barrier = null; 15  16 static void Main(string[] args) 17 { 18 CancellationTokenSource cts = new CancellationTokenSource(); 19  20 CancellationToken ct = cts.Token; 21  22 barrier = new Barrier(tasks.Length, (i) => 23 { 24 Console.WriteLine("**********************************************************"); 25 Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber); 26 Console.WriteLine("**********************************************************"); 27 }); 28  29 for (int j = 0; j < tasks.Length; j++) 30 { 31 tasks[j] = Task.Factory.StartNew((obj) => 32 { 33 var single = Convert.ToInt32(obj); 34  35 LoadUser(single); 36  37 if (!barrier.SignalAndWait(2000)) 38 { 39 //抛出异常,取消后面加载的执行 40 throw new OperationCanceledException(string.Format("我是当前任务{0},我抛出异常了!", single), ct); 41 } 42  43 LoadProduct(single); 44 barrier.SignalAndWait(); 45  46 LoadOrder(single); 47 barrier.SignalAndWait(); 48  49 }, j, ct); 50 } 51  52 //等待所有tasks 4s 53 Task.WaitAll(tasks, 4000); 54  55 try 56 { 57 for (int i = 0; i < tasks.Length; i++) 58 { 59 if (tasks[i].Status == TaskStatus.Faulted) 60 { 61 //获取task中的异常 62 foreach (var single in tasks[i].Exception.InnerExceptions) 63 { 64 Console.WriteLine(single.Message); 65 } 66 } 67 } 68  69 barrier.Dispose(); 70 } 71 catch (AggregateException e) 72 { 73 Console.WriteLine("我是总异常:{0}", e.Message); 74 } 75  76 Console.Read(); 77 } 78  79 static void LoadUser(int num) 80 { 81 Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num); 82  83 if (num == 0) 84 { 85 //自旋转5s 86 if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000)) 87 return; 88 } 89  90 Console.WriteLine("当前任务:{0}正在加载User数据完毕!", num); 91 } 92  93 static void LoadProduct(int num) 94 { 95 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num); 96 } 97  98 static void LoadOrder(int num) 99 {100 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);101 }102 }

二:spinLock(自旋锁)

我们初识多线程或者多任务时,第一个想到的同步方法就是使用lock或者Monitor,然而在4.0 之后给我们提供了另一把武器spinLock,

如果你的任务持有锁的时间非常短,具体短到什么时候msdn也没有给我们具体的答案,但是有一点值得确定的时,如果持有锁的时候比较

短,那么它比那些重量级别的Monitor具有更小的性能开销,它的用法跟Monitor很相似,下面举个例子,Add2方法采用自旋锁。

 1 using System.Collections.Concurrent; 2 using System.Threading.Tasks; 3 using System; 4 using System.Diagnostics; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Threading; 8  9 class Program10 {11 static SpinLock slock = new SpinLock(false);12 13 static int sum1 = 0;14 15 static int sum2 = 0;16 17 static void Main(string[] args)18 {19 Task[] tasks = new Task[100];20 21 for (int i = 1; i <= 100; i++)22 {23 tasks[i - 1] = Task.Factory.StartNew((num) =>24 {25 Add1((int)num);26 27 Add2((int)num);28 29 }, i);30 }31 32 Task.WaitAll(tasks);33 34 Console.WriteLine("Add1数字总和:{0}", sum1);35 36 Console.WriteLine("Add2数字总和:{0}", sum2);37 38 Console.Read();39 }40 41 //无锁42 static void Add1(int num)43 {44 Thread.Sleep(100);45 46 sum1 += num;47 }48 49 //自旋锁50 static void Add2(int num)51 {52 bool lockTaken = false;53 54 Thread.Sleep(100);55 56 try57 {58 slock.Enter(ref lockTaken);59 sum2 += num;60 }61 finally62 {63 if (lockTaken)64 slock.Exit(false);65 }66 }67 }

分类: 并行开发

原文地址:https://www.cnblogs.com/Jeely/p/10999336.html

时间: 2024-09-30 13:04:35

8天玩转并行开发——第四天 同步机制(上)的相关文章

8天玩转并行开发——第五天 同步机制(下)

承接上一篇,我们继续说下.net4.0中的同步机制,是的,当出现了并行计算的时候,轻量级别的同步机制应运而生,在信号量这一块 出现了一系列的轻量级,今天继续介绍下面的3个信号量 CountdownEvent,SemaphoreSlim,ManualResetEventSlim. 一:CountdownEvent 这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个 人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待

8天玩转并行开发——第七天 简要分析任务与线程池

原文:8天玩转并行开发--第七天 简要分析任务与线程池 其实说到上一篇,我们要说的task的知识也说的差不多了,这一篇我们开始站在理论上了解下“线程池”和“任务”之间的关系,不管是 说线程还是任务,我们都不可避免的要讨论下线程池,然而在.net 4.0以后,线程池引擎考虑了未来的扩展性,已经充分利用多核微处理器 架构,只要在可能的情况下,我们应该尽量使用task,而不是线程池. 首先看一下task的结构 从图中我们可以看出Task.Factory.StartNew()貌似等同于用ThreadPo

8天玩转并行开发——第八天 用VS性能向导解剖你的程序

原文:8天玩转并行开发--第八天 用VS性能向导解剖你的程序 最后一篇,我们来说说vs的“性能向导",通常我们调试程序的性能一般会使用Stopwatch,如果希望更加系统的了解程序,我们就需要 用到”性能向导“,通过性能报告便于我们快速的发现并找到潜在的性能问题. 首先我们上一段需要改进的代码: 1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using

8天玩转并行开发——第一天 Parallel的使用

转自:http://www.cnblogs.com/huangxincheng/archive/2012/04/02/2429543.html 随着多核时代的到来,并行开发越来越展示出它的强大威力,像我们这样的码农再也不用过多的关注底层线程的实现和手工控制, 要了解并行开发,需要先了解下两个概念:“硬件线程”和“软件线程”. 1. 硬件线程 相信大家手头的电脑都是双核以上的,像我这样古董的电脑都是双核的,这样的双核叫做物理内核.

优达学城-并行编程-Unit2 通信模块、同步机制、原子操作

(一). Parallel communication Patterns 在上一章CUDA系列学习(二)CUDA memory & variables中我们介绍了memory和variable的不同类型,本章中根据不同的memory映射方式,我们将task分为以下几种类型:Map, Gather, Scatter, Stencil, transpose. 1.1 Map, Gather, Scatter Map: one input - one output Gather: several in

8天玩转并行开发——第二天 Task的使用

在我们了解Task之前,如果我们要使用多核的功能可能就会自己来开线程,然而这种线程模型在.net 4.0之后被一种称为基于 “任务的编程模型”所冲击,因为task会比thread具有更小的性能开销,不过大家肯定会有疑惑,任务和线程到底有什么区别? 1:任务是架构在线程之上的,也就是说任务最终还是要抛给线程去执行. 2:任务跟线程不是一对一的关系,比如开10个任务并不是说会开10个线程,这一点任务有点类似线程池,但是任务相比线程池有很小 的开销和精确的控制. 一:Task 1. 最简单的使用 开启

8天玩转并行开发——第六天 异步编程模型

在.net里面异步编程模型由来已久,相信大家也知道Begin/End异步模式和事件异步模式,在task出现以后,这些东西都可以被task包装 起来,可能有人会问,这样做有什么好处,下面一一道来. 一: Begin/End模式 1: 委托 在执行委托方法的时候,我们常常会看到一个Invoke,同时也有一对你或许不常使用的BeginInvoke,EndInvoke方法对,当然Invoke方法 是阻塞主线程,而BeginInvoke则是另开一个线程. 1 class Program 2 { 3 sta

并行开发学习随笔1——plinq并行

这两天在看园友的文章 <8天玩转并行开发——第三天 plinq的使用> 对里面的第一个实例亲手实践了一下,发现了一点有意思的事情. 测试环境:.net 4.5 64位(如果是32位的,测试千万数据时会爆出out of memory的错误) 在我的机器上,千万数据的测试结果: 百万数据的测试结果: 十万数据的测试结果: 可以看出,到底使用串行还是并行应该根据数据量来决定,两者的大致就在几十万数据的时候性能基本接近.当然这个结果不是固定的,应该是与机器的配置以及测试时的系统环境有比较大的关系,实际

并行开发

8天玩转并行开发系列 http://www.cnblogs.com/huangxincheng/category/368987.html .NET Framework 中的并行编程 http://msdn.microsoft.com/en-us/library/dd460693(v=vs.110).aspx 命名空间System.Threading.Tasks http://msdn.microsoft.com/en-us/library/system.threading.tasks(v=vs.