二、并行编程 - Task任务

初识Task

两种构建Task的方式,只是StartNew方法直接构建出了一个Task之后又调用了其Start方法。

    Task.Factory.

StartNew

(() =>
    {
        Console.WriteLine("Hello word!");
    });

    Task task = new Task(() =>
    {
        Console.WriteLine("Hello,Word!");
    });
    task.Start();

在Task内部执行的内容我们称作为Task的Body,Task提供了多个初始化重载的方法。

public Task(Action action);
public Task(Action<object> action, object state);
public Task(Action action, CancellationToken cancellationToken);
public Task(Action action, TaskCreationOptions creationOptions);

例如使用了重载方法的State参数:

    Task task2 = new Task((

obj

) => { Console.WriteLine("Message: {0}", obj); }, "Say \"Hello\" from task2");
    task2.Start();

任务返回值

使用返回值的Result属性可获取是在一个Task运行完成才会获取的,所以task2是在task1运行完成后,才开始运行,也就是说上面的两个result的值不管运行多少次都是不会变的。其中我们也可以通过CurrentId来获取当前运行的Task的编号。

var loop = 0;
    var task1 = new Task<int>(() =>
    {
        for (var i = 0; i < 1000; i++)
            loop += i;
        return loop;
    });
    task1.Start();
    var loopResut = task1.Result;

    var task2 = new Task<long>(obj=>
    {
        long res = 0;
        var looptimes = (int)obj;
        for (var i = 0; i < looptimes; i++)
            res += i;
        return res;
    },loopResut);

    task2.Start();
    var resultTask2 = task2.Result;

    Console.WriteLine("任务1的结果‘:{0}\n任务2的结果:{1}",   loopResut,resultTask2);

任务延续

所谓的延续的Task就是在第一个Task完成后自动启动下一个Task。我们通过ContinueWith方法来创建延续的Task。我们假设有一个接受xml解析的服务,首先从某个地方接受文件,然后解析入库,最后发送是否解析正确的回执。在每次调用ContinueWith方法时,每次会把上次Task的引用传入进来,以便检测上次Task的状态,比如我们可以使用上次Task的Result属性来获取返回值。

    var 

ReceiveTask

 = new Task(() => ReceiveXml());
    var ResolveTask = 

ReceiveTask

.

ContinueWith

<bool>((r) => ResolveXml());
    var SendFeedBackTask = ResolveTask.

ContinueWith

<string>((s) => SendFeedBack(s.Result));
    ReceiveTask.Start();
    Console.WriteLine(SendFeedBackTask.Result);

上面的代码我们也可以这么写:

   var SendFeedBackTask = Task.Factory.StartNew(() => ReceiveXml())
                            .ContinueWith<bool>(s => ResolveXml())
                            .ContinueWith<string>(r => SendFeedBack(r.Result));
    Console.WriteLine(SendFeedBackTask.Result);

分离嵌套任务

有些情况下我们需要创建嵌套的Task,嵌套里面又分为分离的和不分离的。其创建的方式很简单,就是在Task的body里面创建一个新的Task。如果新的Task未指定AttachedToParent选项,那么就是分离嵌套的。我们看下面这段代码。下面的代码中outTask.Wait()表示等待outTask执行完成。

var outTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning...");
    var childTask = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(3000000);
        Console.WriteLine("Detached nested task completed.");
    });
});
outTask.Wait();
Console.WriteLine("Outer task completed.");
Console.ReadKey();

我们可以看到运行结果是:

子任务

我们将上面的代码加上TaskCreationOptions选项:

var outTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning...");
    var childTask = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(3000000);
        Console.WriteLine("Detached nested task completed.");
    },TaskCreationOptions.AttachedToParent);
});
outTask.Wait();
Console.WriteLine("Outer task completed.");

看到运行结果:

取消任务

我们通过cancellation的tokens来取消一个Task。在很多Task的Body里面包含循环,我们可以在轮询的时候判断IsCancellationRequested属性是否为True,如果是True的话,就可以停止循环以及释放资源,同时抛出OperationCanceledException异常出来。来看一段示例代码:

var cts = new CancellationTokenSource(); var ct =

 cts.Token;

var task = Task.Factory.StartNew(() =>
{
    for (var i = 0; i < 10000000; i++)
    {
        if (ct.IsCancellationRequested)
        {
            Console.WriteLine("任务开始取消...");
            throw new OperationCanceledException(ct);
        }

    }
},ct);

ct.Register(() =>
{
    Console.WriteLine("已经取消");
});

Thread.Sleep(5000);
cts.Cancel();

try
{
    task.Wait();
}
catch (AggregateException e)
{
    foreach (var v in e.InnerExceptions)
        Console.WriteLine("msg: " + v.Message);
}

等待时间执行

在TPL中我们可以通过三种方式进行等待,一是通过CancellTaken的WaitHanle进行等待、第二种则是通过传统的Tread.Sleep方法、第三种则通过Thread.SpainWait方法。

1、CancellToken方式:每次我们等待十秒钟之后,再进行下次输出。

var cts = new CancellationTokenSource();
    var ct = cts.Token;

    var task = new Task(() =>
    {
        for (var i = 0; i < 100000; i++)
        {
            var cancelled = 

ct.WaitHandle.WaitOne(1000

);
            Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled);
            if (cancelled)
            {
                throw new OperationCanceledException(ct);
            }
        }
    }, ct);
    task.Start();

2、上面的功能如果我们要是通过Tread.Sleep方式实现:

            var task = new Task(() =>
            {
                for (var i = 0; i < 100000; i++)
                {
                    Thread.Sleep(10000);
                    

var cancelled =

 ct.IsCancellationRequested;
                    Console.WriteLine(" {0}. Cancelled? {1}",
                                i, cancelled); if (cancelled)
                    {
                        throw new OperationCanceledException(ct);
                    }
                }
            },ct);

3、Thread.SpainWait则跟上面两种方式完全不同,上面的两种方式都是会在线程调度程序不考虑改线程,直等到运行结束。而Thread.SpainWait的作用实质上会将处理器置于十分紧密的循环中,主要的作用是来实现同步锁的作用。并不常用,大部分情况下我们可以通过Lock的方式来实现。

等待任务执行

在很多时候我们也许需要等待同时开启的几个线程完成之后再来做其他事,在TPL中提供了几种方式来等待任务执行。Task.Wait等待单个任务完成;Task.WaitAll等待所有的Task完成、TaskAny等在其中的任何一个或则多个任务完成。

1、Task.Wait:

共有5个重载:Wait()、Wait(CancellToken)、Wait(Int32)、Wait(TimeSpan)、Wait(TimeSpan、CancellToken)。各个重载方法的含义:

1)Wait():等待整个任务完成或者取消或者出现异常;
2)Wait(CancellToken):等待任务直到CancellToken调用取消或者完成,或者出现异常;
3)Wait(Int32):等待任务,未完成则到指定的时间;
4)Wait(TimeSpan):同上;
5)Wait(TimeSpan、CancellToken):等待任务到指定时间,或者CancellToken调用取消或者任务完成。

static void Main(string[] args)
{
    var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
        Task task = createTask(token,6);    task.Start();
    Console.WriteLine("Wait() complete.");
    task.Wait();
    Console.WriteLine("Task Completed.");

    task = createTask(token,3);
    task.Start();
    Console.WriteLine("Wait(2) secs for task to complete.");
    bool completed = task.Wait(2000);
    Console.WriteLine("Wait ended - task completed: {0}", completed);

    task = createTask(token,4);
    task.Start();
    Console.WriteLine("Wait(2,token) for task to complete.");
    completed = task.Wait(2000, token);
    Console.WriteLine("Wait ended - task completed: {0} task cancelled {1}",
    completed, task.IsCanceled);
    Console.WriteLine("Main method complete. Press enter to finish.");
    Console.ReadLine();
}
static Task createTask(CancellationToken token,int loop)
{
    return new Task(() =>
    {
        for (int i = 0; i < loop; i++)
        {
            token.ThrowIfCancellationRequested();
            Console.WriteLine("Task - Int value {0}", i);
            token.WaitHandle.WaitOne(1000);
        }
    }, token);
}

循环都会等待1秒钟,这样我们可以看看Wait(2000)的效果,看看运行后的效果:

2、Task.WaitAll方法

是等待所有的任务完成,也有5个重载, 也可以传递时间以及Token参数,进行等待时间以及取消Token的控制。

   var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
    var task1 = createTask(token,2);
    var task2 = createTask(token, 5);
    task1.Start();
    task2.Start();
    Console.WriteLine("Waiting for tasks to complete.");
    Task.WaitAll(task1, task2);
    Console.WriteLine("Tasks Completed.");

3、Task.WaitAny

等待任何一个任务完成,完成之后返回其完成的任务的Index:

var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
    var task1 = createTask(token,2);
    var task2 = createTask(token, 5);
    task1.Start();
    task2.Start();
    Console.WriteLine("Waiting for tasks to complete.");
    var index = Task.WaitAny(task1, task2);
    Console.WriteLine("Tasks Completed.Index is {0}",index);

异常处理

在TPL中,异常的触发器主要是这几个:
Task.Wait(), Task.WaitAll(), Task,WaitAny(),Task.Result。而在TPL出现的异常都会以AggregateException的示例抛出,我们在进行基本的异常处理时,可以通过查看AggregateException的InnerExceptions来进行内部异常的捕获:

var tokenSource = new CancellationTokenSource();
    var token = tokenSource.Token;
    var task1 = new Task(() =>
    {
        throw new NullReferenceException() { Source="task1"};
    });
    var task2 = new Task(() =>
    {
        throw new ArgumentNullException("a", "a para can not be null") { Source="task2"};
    });
    task1.Start(); task2.Start();
    try
    {
        Task.WaitAll(task1, task2);
    }
    catch(AggregateException ex)
    {
        foreach (Exception inner in ex.InnerExceptions)
        {
            Console.WriteLine("Exception type {0} from {1}",
            inner.GetType(), inner.Source);
        }
    }

同时,我们还可以通过Task的几个属性来判断Task的状态,如:IsCompleted, IsFaulted, IsCancelled,Exception。另外,AggregateException中还提供了Handle方法来给我们方法来给我们处理每个内部 异常,每个异常发生时都会调用Handle传入的delegate ,同时我们需要通过返回True,False来告诉异常是否已经被处理,比如对于OperationCanceledException我们知道是取消了Task,是肯定可以处理的:

try
    {
        Task.WaitAll(task1, task2, task3, task4);
    }
    catch(AggregateException ex)
    {
        ex.Handle((e) =>
        {
            if (e is OperationCanceledException)
            {
                return true;
            }
            else
            {
                return false;
            }
        });

    }

原文地址:https://www.cnblogs.com/springsnow/p/9409205.html

时间: 2024-11-05 23:26:44

二、并行编程 - Task任务的相关文章

C#并行编程-Task

原文:C#并行编程-Task 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 任务简介 TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码. 但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有一对一的关系.) 创建一个新的任务时,调度器(调度器依赖于底层的线程池

C#并行编程 z

目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制

C# 异步编程Task整理(二)异常捕捉

一.在任务并行库中,如果对任务运行Wait.WaitAny.WaitAll等方法,或者求Result属性,都能捕获到AggregateException异常. 可以将AggregateException异常看做是任务并行库编程中最上层的异常. 在任务中捕获的异常,最终都应该包装到AggregateException中.一个任务并行库异常的简单处理示例如下: static void TestTwo() { Task t1= Task.Factory.StartNew(() => { throw n

并行编程和任务(二)

前言 上一篇我们主要介绍了并行编程相关的知识,这一节我们继续介绍关于任务相关的知识.为了更好的控制并行操作,我们可以使用System.Threading.Tasks中的Task类.我们首先来了解是什么是任务——任务表示将要完成的一个或某个工作单元,这个工作单元可以在单独线程中运行,也可以使用同步方式启动运行(需要等待主线程调用).为什么使用任务呢?——任务不仅可以获得一个抽象层(将要完成的工作单元).还可以对底层的线程运行进行更好更多的控制(任务的运行). 使用线程池的任务 我们讲到使用任务可以

OpenCL学习笔记(二):并行编程概念理解

欢迎转载,转载请注明:本文出自Bin的专栏blog.csdn.net/xbinworld. 技术交流QQ群:433250724,欢迎对算法.技术.应用感兴趣的同学加入. 并行编程的需求是显而易见的,其最大的难题是找到算法的并行功能,同时必须处理数据的共享和同步.但是,因为每一个算法都是不一样的,很难有通用的并行功能--粒度都有可能是不一样的.OpenCL提供了很多并行的抽象模型,因此算法开发人员可以在不同粒度上开发并行的算法,以及数据的共享和同步. 一般来说,并行编程有两种大类型--分散收集(s

并行编程入门

目录 1. 并行编程简介 2. MapReduce 2.1 MapReduce简介 2.2 MapReduce框架 2.3 Hadoop介绍 2.4 Hadoop基本类 2.5 Hadoop编程实例 1.并行编程简介 1.1.并行编程作用,用途 商业用途,科学计算,大数据分析 1.2.并行编程兴起原因 目前的串行编程的局限性 使用的流水线等隐式并行模式的局限性 硬件的发展 1.3.并行算法设计原则步骤 a.分析问题 b.分解问题 其中分解方法有: 数据分解 递归分解 探测性分解 推测性分解 混合

Net并行编程高级教程--Parallel

Net并行编程高级教程--Parallel 一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够

【读书笔记】.Net并行编程高级教程--Parallel

一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行.硬件线程也称为逻辑内核,一个物

C#并行编程中的Parallel.Invoke

一.基础知识 并行编程:并行编程是指软件开发的代码,它能在同一时间执行多个计算任务,提高执行效率和性能一种编程方式,属于多线程编程范畴.所以我们在设计过程中一般会将很多任务划分成若干个互相独立子任务,这些任务不考虑互相的依赖和顺序.这样我们就可以使用很好的使用并行编程.但是我们都知道多核处理器的并行设计使用共享内存,如果没有考虑并发问题,就会有很多异常和达不到我们预期的效果.不过还好NET Framework4.0引入了Task Parallel Library(TPL)实现了基于任务设计而不用