TPL Part 4 -- Task的协同

简单的Continuation

Task.ContinueWith(Task): 当指定的Task执行完毕时。

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
});
root Task.ContinueWith((Task previousTask)=>{
Console.WriteLine("continute task completed");
});

rootTask.Start();

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

Task.ContinueWhenAll(Task[]):当指定的所有Task都执行完毕时,示例代码:

Task continuation = Task.Factory.ContinueWhenAll<int>(tasks, antecedents =>{
foreach(Task<int> t in antecedents) {
// dosomething
}
});

TaskFactory.ContinueWhenAny(Task[]):当指定的所有Task的任意1个执行完毕时,代码与ContinueWhenAll类似(以下代码中,打印出前1个Task的执行时间):

Task continuation = Task.Factory.ContinueWhenAny<int>(tasks,
(Task<int>antecedent) => {
//write out a message using the antecedent result
Console.WriteLine("The first task slept for {0} milliseconds",
antecedent.Result);
});

Continue 选项

OnlyOnRanToCompletion仅当执行完

NotOnRanToCompletion:没有执行完(被取消或出现异常)

OnlyOnFaulted:仅当出现异常

NotOnFaulted:没有出现异常

OnlyOnCancelled:仅当被取消

NotOnCancelled:没有被取消

处理异常

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
rootTask.ContinueWith((Task previousTask)=>{
Console.WriteLine("even root throw exception , I still run");
});

rootTask.Start();

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

以上代码中,第一个task中抛出了异常,Continue的Task仍然会继续执行。可是Task被Finalized时异常就会抛出。

解决方案:

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
var t2 = rootTask.ContinueWith((Task previousTask)=>{
//
if(previousTask.Status== TaskStatus.Faulted){
throw previousTask.Exception.InnerException;
}
Console.WriteLine("even root throw exception , I still run");
});

rootTask.Start();

try{
t2.Wait();
}
catch(AggregateException ex){
ex.Handle(inner=>{Console.WriteLine("exception handled in main thread"); return true;});
}

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在Task中冒泡抛出异常,在主线程中等待最后那个Task的执行并对AggregateException进行处理。

创建子Task

创建子Task并附加在父Task上:

void Main()
{

Task parentTask = new Task(() => {
Console.WriteLine("parent task started");
//create the first child task
Task childTask = new Task(() => {
// writeout a message and wait
Console.WriteLine("Child task running");
Thread.Sleep(1000);
Console.WriteLine("Child task throwed exception");
throw new Exception();
} ,TaskCreationOptions.AttachedToParent);
Console.WriteLine("start child task...");
childTask.Start();

Console.WriteLine("parent task ended");
});
// startthe parent task
parentTask.Start();

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}

1. 父Task会抛出子Task中的异常

2. 父Task的状态会受到所附加的子Task状态的影响

Barrier的使用

class BankAccount {
public int Balance {
get;
set;
}
} ;

void Main()
{
//create the array of bank accounts
BankAccount[] accounts = new BankAccount[6];
for(int i = 0;i < accounts.Length; i++) {
accounts[i] = new BankAccount();
}
//create the total balance counter
int totalBalance = 0;
//create the barrier
Barrier barrier = new Barrier(3, (myBarrier) => {

// zerothe balance
totalBalance= 0;
// sumthe account totals
foreach(BankAccount account in accounts) {
totalBalance+= account.Balance;
}
// writeout the balance
Console.WriteLine("[From barrier :] Total balance: {0}",totalBalance);
});
//define the tasks array
Task[] tasks = new Task[3];
// loopto create the tasks
for(int i = 0;i < tasks.Length; i++) {
tasks[i]= new Task((stateObj) => {
//create a typed reference to the account
BankAccount account = (BankAccount)stateObj;
// startof phase
Random rnd = new Random();
for(int j = 0;j < 1000; j++) {
account.Balance+= 2;
}

Thread.Sleep(new Random().Next(3000));

Console.WriteLine("Task {0} waiting, phase {1} ",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier

barrier.SignalAndWait();

account.Balance-= 1000;
Console.WriteLine("barrier finished .");
// endof phase
Console.WriteLine("Task {0}, phase {1} ended",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
},
accounts[i]);
}

// startthe task
foreach(Task t in tasks) {
t.Start();
}
// waitfor all of the tasks to complete
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在以上代码中,打开了3个barrier和三个Task,在Task中为每个账户添加2000,然后给barrier发出同步信号,当barrier收到3个信号时,对账号进行求和并保存;当barrier完成逻辑后,控制权交给了每个Task,此时每个Task对account减1000,再次求和,最后结果为3000。

如果希望通过Cancel来控制barrier的行为,还可以在barrier中传入tokenSource.Token:barrier.SignalAndWait(tokenSource.Token);并在Task中执行Cancel:tokenSource.Cancel()。

可以通过调用barrier.RemoveParticipant();来减少barrier的count。

CountEventDown

作用和Barrier类似,累计信号数量,当信号量达到指定数量,set event。

void Main()
{

CountdownEvent cdevent = new CountdownEvent(5);
//create a Random that we will use to generate
// sleepintervals
Random rnd = new Random();
//create 5 tasks, each of which will wait for
// arandom period and then signal the event
Task[] tasks = new Task[6];
for(int i = 0;i < tasks.Length; i++) {
//create the new task
tasks[i]= new Task(() => {
// putthe task to sleep for a random period
// up toone second
Thread.Sleep(rnd.Next(500, 1000));
//signal the event
Console.WriteLine("Task {0} signalling event",Task.CurrentId);
cdevent.Signal();
});
};
//create the final task, which will rendezous with the other 5
// usingthe count down event
tasks[5] = new Task(()=> {
// waiton the event
Console.WriteLine("Rendezvous task waiting");
cdevent.Wait();
Console.WriteLine("CountDownEvent has been set");
});

// startthe tasks
foreach(Task t in tasks) {
t.Start();
}
Task.WaitAll(tasks);

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在以上代码中,开启了5个Task和1个count为5的CountDownEvent对象,每个Task中完成任务后分别对CountDownEvent发信号,当凑齐5个信号后,会打印出CountDownEvent has been set。

ManualResetEvent 和 AutoResetEvent

熟悉.net之前版本的应该都对它们很熟悉,用于在多线程环境中完成线程同步。区别在于,前者必须调用reset才能恢复信号;而AutoResetEvent则会自动reset。在此不再赘述。

SemaphoreSlim

void Main()
{
    SemaphoreSlim semaphore = new SemaphoreSlim(3);
//create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();

//create and start the task that will wait on the event
for(int i = 0;i < 10; i++) {
Task.Factory.StartNew((obj)=> {

semaphore.Wait(tokenSource.Token);
// printout a message when we are released
Console.WriteLine("Task {0} released", obj);

},i,tokenSource.Token);
}

//create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loopwhile the task has not been cancelled
while(!tokenSource.Token.IsCancellationRequested) {
// go tosleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
//signal the semaphore
semaphore.Release(3);
Console.WriteLine("Semaphore released");
}
// if wereach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
},tokenSource.Token);
// askthe user to press return before we cancel
// thetoken and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
//cancel the token source and wait for the tasks
tokenSource.Cancel();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在以上代码中,new了1个SemaphoreSlim对象并传入3,开了10个Task线程,每当有信号从Semaphore传来时,打印Task[i]被release。同时开1个信号线程,每500毫秒release3个Task。

可见,Semaphore的作用主要是可以选择一次release多少个Task。

Producer / Consumer(生产者/消费者模式)

以下代码中,new了1个BlockingCollection,类型为Deposit。开了3个生产者Task,每个生产者中创建20个Deposit对象并给Amount赋值为100。在主线程中等待生产者Task执行完毕,调用blockingCollection.CompleteAdding()方法。之后开1个消费者Task用于操作账户对象,循环判断blockingCollection.IsCompleted属性(生产者是否完成工作),从集合拿出存款对象,增加账户余额。

示例代码:

class BankAccount {
public int Balance {
get;
set;
}
}
class Deposit {
public int Amount {
get;
set;
}
}

void Main()
{
BlockingCollection<Deposit> blockingCollection
= new BlockingCollection<Deposit>();

var producers = new List<Task>();
for(int i = 0;i < 3; i++) {
var producer = Task.Factory.StartNew((obj) => {
//create a series of deposits
for(int j = 0;j < 20; j++) {
//create the transfer
var randAmount = new Random().Next(100);
Deposit deposit = new Deposit { Amount = randAmount};
Thread.Sleep(newRandom().Next(200));
// placethe transfer in the collection
blockingCollection.Add(deposit);
Console.WriteLine(string.Format("Amount: {0} deposit Processed, index: {1}",randAmount, int.Parse(obj.ToString()) +j));

}
}, i*20);
producers.Add(producer);
};
//create a many to one continuation that will signal
// theend of production to the consumer
Task.Factory.ContinueWhenAll(producers.ToArray(),antecedents => {
//signal that production has ended
Console.WriteLine("Signalling production end");
blockingCollection.CompleteAdding();
});
//create a bank account
BankAccount account = new BankAccount();
//create the consumer, which will update
// thebalance based on the deposits
Task consumer = Task.Factory.StartNew(() => {
while(!blockingCollection.IsCompleted) {
Deposit deposit;
// tryto take the next item
if(blockingCollection.TryTake(outdeposit)) {
//update the balance with the transfer amount
account.Balance+= deposit.Amount;
}
}
// printout the final balance
Console.WriteLine("Final Balance: {0}", account.Balance);
});
// waitfor the consumer to finish
consumer.Wait();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}
时间: 2024-08-10 15:05:43

TPL Part 4 -- Task的协同的相关文章

TPL Part 1 Task 的使用

创建Task //1. 使用Action Task task1 = new Task(newAction(printMessage)); // 使用匿名代理 Task task2 = new Task(delegate { printMessage(); }); // 使用Lambda Task task3 = new Task(() =>printMessage()); // 使用匿名代理 Task task4 = new Task(() => { printMessage(); }); 还

Task的运行原理和工作窃取(work stealing)

在net4.0以前,当调用ThreadPool.QueueUserWorkItem方法往线程池中插入作业时,会把作业内容(其实就是一个委托)放到线程池中的一个全局队列中,然后线程池中的线程按照先进先出的方式取出作业,并处理. 如下图中的方式,主程序创建了Item到Queue中,然后分配到了各个工作线程中.    但 是在.net 4.0以后,线程池做了一些改进,比如增加了TPL(Task Parallel Library),TPL使用到了.net 4.0中新增加的一些特性.这些特性只能通过TPL

.net 异步编程async &amp; await关键字的思考

C# 5.0引入了两个关键字 async和await,这两个关键字在很大程度上帮助我们简化了异步编程的实现代码,而且TPL中的task与async和await有很大的关系 思考了一下异步编程中的async & await关键字,对两个关键字尤其是await关键字一直很迷糊,因此深入思考了一下.首先借助的示例是:[你必须知道的异步编程]C# 5.0 新特性--Async和Await使异步编程更简单这是博客园一个大牛写的,自己也一直关注这个大神,不得不说,博客园大神很多,而且氛围也很好.我引入了其中

.NET 4 并行(多核)编程系列之一入门介绍

本系列文章将会对.NET 4中的并行编程技术(也称之为多核编程技术)以及应用作全面的介绍. 本篇文章的议题如下:  1. 并行编程和多线程编程的区别.  2. 并行编程技术的利弊  3. 何时采用并行编程 系列文章链接: .NET 4 并行(多核)编程系列之一入门介绍 .NET 4 并行(多核)编程系列之二 从Task开始 .NET 4 并行(多核)编程系列之三 从Task的取消 .NET 4 并行(多核)编程系列之四 Task的休眠 .NET 并行(多核)编程系列之五 Task执行和异常处理

C#5.0之后推荐使用TPL(Task Parallel Libray 任务并行库) 和PLINQ(Parallel LINQ, 并行Linq). 其次是TAP(Task-based Asynchronous Pattern, 基于任务的异步模式)

学习书籍: <C#本质论> 1--C#5.0之后推荐使用TPL(Task Parallel Libray 任务并行库) 和PLINQ(Parallel LINQ, 并行Linq). 其次是TAP(Task-based Asynchronous Pattern, 基于任务的异步模式). --用AggregateException处理Task上的未处理异常. --取消任务. CancellationToken --async修饰方法, 返回Task. task.wait(100)可以阻塞现场. a

[多线程]thread,threadpool,task及TPL知识点整理

简单理解 Thread:是一个指令序列,个体对象. Threadpool:在使用Thread的过程中,程序员要为每个希望并发的序列new一个线程,很麻烦,因此希望有一个统一管理线程的方法,程序员就不需要关注线程的申请管理问题,所以就对Thread进行一系列封装,有了ThreadPool.使用Threadpool,把需要并发的序列添加进线程池,线程池根据其线程列表中的线程的空闲情况,动态为并发序列申请线程. Task:再后来,程序员发现在使用Threadpool的过程当中还是存在很多不便,比如:(

TPL(Task Parallel Library)多线程、并发功能

The Task Parallel Library (TPL) is a set of public types and APIs in the System.Threading and System.Threading.Tasks namespaces. The purpose of the TPL is to make developers more productive by simplifying the process of adding parallelism and concurr

使用TPL取回Task中的运行结果的三种方式

概念:TPL( Task Parallel Library) 任务并行库 使用Task类执行多线程操作要比直接使用自己手工创建Thread效率高很多. 默认情况下,TPL使用线程池中的线程执行Task,但是工作结束之后,调用者线程怎么取回执行线程的工作结果呢? 这里有三种方法: 1.使用经典的线程同步手段: 可以使用线程同步对象.比如ManualResetEvent 在任务方法中设置ManualResetEvent状态为Signaled 调用者示例代码: 示例代码: /// <summary>

日志系统之基于Zookeeper的分布式协同设计

最近这段时间在设计和实现日志系统,在整个日志系统系统中Zookeeper的作用非常重要--它用于协调各个分布式组件并提供必要的配置信息和元数据.这篇文章主要分享一下Zookeeper的使用场景.这里主要涉及到Zookeeper在日志系统中的使用,但其实它在我们的消息总线和搜索模块中也同样非常重要. 日志元数据 日志的类型和日志的字段这里我们统称为日志的元数据.我们构建日志系统的目的最终主要是为了:日志搜索,日志分析.这两大块我们很大程度上依赖于--ElasticSearch(关于什么是Elast