并行编程中的取消任务、共享状态,等等

在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候。

比如说有一个盒子的集合,分别让盒子旋转一定的角度。

void RotateBox(IEnumerable<Box> boxes, float degree)
{
    Parallel.ForEach(boxes, box => box.Rotate(degree));
}

如果并行任务中的一个任务出现异常,需要结束出问题的任务呢?

Parallel.ForEach为我们提供了一个重载方法,可以控制任务是否继续。

void RotateBoxes(IEnumerable<Box> boxes)
{
    Parallel.ForEach(boxes, (box, state) => {
        if(!box.IsInvertible)
        {
            state.Stop();
        }
        else
        {
            box.Invert();
        }
    });
}

如果想取消整个并行任务呢?

Parallel.ForEach也为我们提供一个重载方法,可以接收一个ParallelOption的形参,通过设置ParallelOption的CancellationToken属性来取消整个并行过程。

void RotateBoxes(IEnumerable<Box> boxes,float degrees, CancellationToken token)
{
    Paralle.ForEach(boxes,
    new ParallelOptions{CancellationToken=token},
    box => box.Rotate(degrees));
}

在使用的时候,一般先定义个全局CancellationTokenSource类型的变量。

static CancellationTokenSource token = new CancellationTokenSource();

然后,在某个并行任务中设置取消。

token.Cancel();

最后,再把这个token赋值给以上方法的CancellationToken属性。

各个并行任务如何共享状态,共享一个变量呢?

int InvertBoxes(IEnumerable<Box> boxes)
{
    object mutex = new object();//用来锁
    int nonInvertibleCount = 0; //各任务共享的变量

    Paralle.ForEach(boxes, box =>{
        if(box.IsInvertible){
            box.Invert();
        }
        else
        {
            lock(mutex)
            {
                ++nonInvertibleCount;
            }
        }
    });

    return nonInvertibleCount;
}

可见,对于各并行线程共享的变量,需要加一个线程锁,以防止多个线程同时操作共享变量。

另外,Parallel.ForEach提供了一个重载,其中localFinally形参接收一个委托类型,通过该委托让并行任务共享一个变量。比如:

static int ParallelSum(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;

    Parallel.ForEach(
        source: values,
        LocalInit: () => 0,
        body: (item, state, localVlaue) => localValue + item,
        localFinally: localValue => {
            lock(mutex)
            {
                result += localValue;
            }
        }
    );

    return result;
}

当然,也别忘了PLINQ也支持并行:

static int ParalleSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

PLINQ的Aggregate方法也可实现:

static int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item;
    );
}

以上,是对相互独立数据的处理。

那么,如何处理相互独立的多个任务呢?

通过Parallel.Invoke方法可以实现。

static voiD ProcessArray(double[] array)
{
    Parallel.Invoke(
        () => ProcessPartialArray(array, 0, array.Length/2),
        () => ProcessPartialArray(array, array.Length/2, array.Length)
    );
}

static void ProcessPartialArray(double[] array, int begin, int end)
{}

使用Parallel.Invoke方法还可以让一个Action或这方法执行多次。

static void Do20(Action action)
{
    //让某个Action执行20次
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(actions);
}

Parallel.Invoke方法也提供了重载,接收ParallelOption类型的形参,用来控制取消整个并行过程。

static void Do20(Action action)
{
    //让某个Action执行20次
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(new ParallelOptions{CancellationToken = token},actions);
}

参考资料:C#并发编程经典实例

时间: 2024-08-27 19:19:42

并行编程中的取消任务、共享状态,等等的相关文章

并行编程中的内存回收Hazard Pointer

接上篇使用RCU技术实现读写线程无锁,在没有GC机制的语言中,要实现Lock free的算法,就免不了要自己处理内存回收的问题. Hazard Pointer是另一种处理这个问题的算法,而且相比起来不但简单,功能也很强大.锁无关的数据结构与Hazard指针中讲得很好,Wikipedia Hazard pointer也描述得比较清楚,所以我这里就不讲那么细了. 一个简单的实现可以参考我的github haz_ptr.c 原理 基本原理无非也是读线程对指针进行标识,指针(指向的内存)要释放时都会缓存

C#并行编程中的Parallel.Invoke

一.基础知识 并行编程:并行编程是指软件开发的代码,它能在同一时间执行多个计算任务,提高执行效率和性能一种编程方式,属于多线程编程范畴.所以我们在设计过程中一般会将很多任务划分成若干个互相独立子任务,这些任务不考虑互相的依赖和顺序.这样我们就可以使用很好的使用并行编程.但是我们都知道多核处理器的并行设计使用共享内存,如果没有考虑并发问题,就会有很多异常和达不到我们预期的效果.不过还好NET Framework4.0引入了Task Parallel Library(TPL)实现了基于任务设计而不用

分布式异步消息框架构建笔记5——如何避开并行编程中的数据共享陷阱

任何多线程/并行/分布式都会面临一个问题,"数据状态共享". 有经验的开发者会说,要想正确有效的避开避开状态共享,那么就应该别用任何状态共享. 虽然不得不说,这是一个不错的建议,但是没有状态共享,你需要如何才能知道非本地数据的状态? 也许你会说使用消息,使用消息来处理,那么我们丑陋的回调金字塔应该叠的更高了. 不得不说这是一个解决办法,但是为了保持状态不被修改,那么我们还得在远程申请一个写入锁,防止数据被别的任务所修改. 那么流程就是 申请锁->请求某个消息状态->释放锁

C#并行编程-Parallel

原文:C#并行编程-Parallel 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. TPL中引入了一个新命名空间System.Threading.Tasks,在该命名空间下Task是主类,表示一个类的异步的并发的操作,创建并行代码的时候不一定要直接使用Task类,在某些情况下可以直接使用Parallel静态类(System.Threading.Tasks.Parallel)下所提供的方法,而不用底层的Task实例. Parallel.Invoke  试图将很多方

C#中的多线程 - 并行编程 z

原文:http://www.albahari.com/threading/part5.aspx 专题:C#中的多线程 1并行编程Permalink 在这一部分,我们讨论 Framework 4.0 加入的多线程 API,它们可以充分利用多核处理器. 并行 LINQ(Parallel LINQ)或称为 PLINQ Parallel类 任务并行(task parallelism)构造 SpinLock 和 SpinWait 这些 API 可以统称为 PFX(Parallel Framework,并行

NET中的并行编程(TPL)——多线程、异步、任务和并行计算

https://masuit.com/1201 谈一谈.NET中的并行编程(TPL)——多线程.异步.任务和并行计算 懒得勤快 发表于2018-04-26 19:41:00 | 最后修改于2018-06-27 23:44:40 .NET 多线程 异步 高并发 分类:.NET开发技术 | 评论总数:0条 | 热度:2243℃ 我要编辑 写在前面: 在做了几个月的高并发项目的过程中,其实发现自己真的提升了不少,所以也想把这段时间的收获分享给大家,然后写这篇文章发现,写下来是一发不可收拾,所以这篇文章

.Net中的并行编程-2.ConcurrentStack的实现与分析

在上篇文章<.net中的并行编程-1.基础知识>中列出了在.net进行多核或并行编程中需要的基础知识,今天就来分析在基础知识树中一个比较简单常用的并发数据结构--.net类库中无锁栈的实现. 首先解释一下什么这里“无锁”的相关概念. 所谓无锁其实就是在普通栈的实现方式上使用了原子操作,原子操作的原理就是CPU在系统总线上设置一个信号,当其他线程对同一块内存进行访问时CPU监测到该信号存在会,然后当前线程会等待信号释放后才能对内存进行访问.原子操作都是由操作系统API实现底层由硬件支持,常用的操

C#并行编程-Task

原文:C#并行编程-Task 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 任务简介 TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码. 但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有一对一的关系.) 创建一个新的任务时,调度器(调度器依赖于底层的线程池

C#并行编程-线程同步原语(Barrier,CountdownEvent,ManualResetEventSlim,SemaphoreSlim,SpinLock,SpinWait,Monitor,volatile)

菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 背景 有时候必须访问变量.实例.方法.属性或者结构体,而这些并没有准备好用于并发访问,或者有时候需要执行部分代码,而这些代码必须单独运行,这是不得不通过将任务分解的方式让它们独立运行. 当任务和线程要访问共享的数据和资源的时候,您必须添加显示的同步,或者使用原子操作或锁. 之前的.NET Framework提供了昂贵的锁机制以及遗留的多线程模型,新的数据结构允许细粒度的并发和并行化,并且降低一定必要的开销,这些数据结