此文为个人学习《C#并行编程高级教程》的笔记,总结并调试了一些文章中的代码示例。 在以后开发过程中可以加以运用。
最基本的使用,并行任务的创建
在 .Net Framework 4 中出现了Task 的概念。在以往的多线程程序中虽然使用的是thread,但大多数的时候我们还是会把业务处理划分为Task。这样更接近于人类的思考方式,毕竟Thread不能说明它和业务的关联。这里C#直接提供了task,也算是对开发者简便了一些。
Task 的使用非常简单,定义工作函数,创建Task,Task开始运行,Wait/WaitAll 等待Task结束。
Sample 1 并行任务的创建
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace CSParallel_Program
{
class ThreadWork1
{
public ThreadWork1()
{ }
public void run()
{
System.Console.WriteLine("ThreadWork1 run { ");
for (int i = 0; i < 100; i++)
{
System.Console.WriteLine("ThreadWork1 : " + i);
}
System.Console.WriteLine("ThreadWork1 run } ");
}
}
class ThreadWork2
{
public ThreadWork2()
{ }
public void run()
{
System.Console.WriteLine("ThreadWork2 run { ");
for (int i = 0; i < 100; i++)
{
System.Console.WriteLine("ThreadWork2 : " + i*i);
}
System.Console.WriteLine("ThreadWork2 run } ");
}
}
class Program
{
static void StartT1()
{
ThreadWork1 work1 = new ThreadWork1();
work1.run();
}
static void StartT2()
{
ThreadWork2 work2 = new ThreadWork2();
work2.run();
}
static void Main(string[] args)
{
Task t1 = new Task(() => StartT1());
Task t2 = new Task(() => StartT2());
Console.WriteLine("Sample 3-1 Main {");
Console.WriteLine("Main t1 t2 started {");
t1.Start();
t2.Start();
Console.WriteLine("Main t1 t2 started }");
Console.WriteLine("Main wait t1 t2 end {");
Task.WaitAll(t1,t2);
Console.WriteLine("Main wait t1 t2 end }");
Console.WriteLine("Sample 3-1 Main }");
Console.ReadKey();
}
}
}
线程的取消
Task 提供了线程取消的操作,使用起来也是非常的简单。它本质上是靠异常来打断线程的执行,并且把Task的状态置为Cancel状态(但这一点在测试程序中,Task的状态并没有置为Cancel,而是Faild,也许换个PC能解决这个问题也不一定)。
要实现线程的取消需要一下步骤。
1) 首先要在线程中加入取消检查点(ThrowIfCancellationRequested),这里是在工作函数的起始处和while循环的运行中。
2) 在Main函数中定义CancellationTokenSource对象并把它作为参数传递给工作Task。
3) 在运行时调用CancellationTokenSource.Cancel(),触发线程的取消。
4) 在Main函数中捕获取消异常(OperationCanceledException),线程终止,取消成功。
Sample 2 线程的取消
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace CSParallel_Program
{
class ThreadWork1
{
public ThreadWork1()
{ }
public void run_with_cancel(System.Threading.CancellationToken ct)
{
//try
//{
System.Console.WriteLine("ThreadWork1 run { ");
ct.ThrowIfCancellationRequested();
for (int i = 0; i < 100; i++)
{
System.Console.WriteLine("ThreadWork1 : " + i);
System.Threading.Thread.Sleep(200);
ct.ThrowIfCancellationRequested();
}
System.Console.WriteLine("ThreadWork1 run } ");
//}
//catch (OperationCanceledException ex)
//{
// System.Console.WriteLine("ThreadWork1 Get Exception : " + ex.ToString());
//}
}
}
class ThreadWork2
{
public ThreadWork2()
{ }
public void run_with_cancel(System.Threading.CancellationToken ct)
{
//try
//{
ct.ThrowIfCancellationRequested();
System.Console.WriteLine("ThreadWork2 run { ");
for (int i = 0; i < 100; i++)
{
System.Console.WriteLine("ThreadWork2 : " + i * i);
System.Threading.Thread.Sleep(300);
ct.ThrowIfCancellationRequested();
}
System.Console.WriteLine("ThreadWork2 run } ");
//}
//catch (OperationCanceledException ex)
//{
// System.Console.WriteLine("ThreadWork2 Get Exception : " + ex.ToString());
//}
}
}
class Program
{
static void StartT1(System.Threading.CancellationToken ct)
{
ThreadWork1 work1 = new ThreadWork1();
work1.run_with_cancel(ct);
}
static void StartT2(System.Threading.CancellationToken ct)
{
ThreadWork2 work2 = new ThreadWork2();
work2.run_with_cancel(ct);
}
static void Main(string[] args)
{
System.Threading.CancellationTokenSource cts =
new System.Threading.CancellationTokenSource();
System.Threading.CancellationToken ct = cts.Token;
Task t1 = new Task(() => StartT1(ct));
Task t2 = new Task(() => StartT2(ct));
Console.WriteLine("Sample 3-1 Main {");
Console.WriteLine("Main t1 t2 started {");
t1.Start();
t2.Start();
Console.WriteLine("Main t1 t2 started }");
Console.WriteLine("Main sleep 2 seconds and CANCEL {");
System.Threading.Thread.Sleep(2000);
cts.Cancel();
Console.WriteLine("Main sleep 2 seconds and CANCEL }");
try
{
Console.WriteLine("Main wait t1 t2 end {");
if (!Task.WaitAll(new Task[] { t1, t2 }, 5000))
{
Console.WriteLine("Worker1 and Worker2 NOT complete within 5 seconds");
Console.WriteLine("Worker1 Status: " + t1.Status);
Console.WriteLine("Worker2 Status: " + t2.Status);
}
else
{
Console.WriteLine("Worker1 and Worker2 complete within 5 seconds");
}
Console.WriteLine("Main wait t1 t2 end }");
}
catch (AggregateException agg_ex)
{
foreach (Exception ex in agg_ex.InnerExceptions)
{
Console.WriteLine("Agg Exceptions: " + ex.ToString());
Console.WriteLine("");
}
}
if (t1.IsCanceled)
{
Console.WriteLine("Worker 1 is CANCELED");
}
if (t2.IsCanceled)
{
Console.WriteLine("Worker 2 is CANCELED");
}
Console.WriteLine("Sample 3-1 Main }");
Console.ReadKey();
}
}
}
线程的返回值
Task可以提供返回值服务,这个与Thread还是有些区别的,在Thread中,无论是C#还是C++会通过共享的队列等方式返回测试结果。
使用返回值的步骤:
1) 第一步当然要给工作函数定义返回值,并且在实现的过程中返回有效值。
2) 定义Task时,要使用var而不能使用Task了,而且还得使用Task.Factory。 var t1 = Task.Factory.StartNew(() => StartT1()); 。后面取得结果时需要用到t1.Result,如果使用把t1定义为Task它是没有Result属性的。
3) 运行Task。
4) 取得结果,主要使用的是 t1.Result 属性。
5) 关于 TaskCreationOptions.LongRunning,我们可以指定操作系统运行任务时的一些方式,当然这些方式还是由系统控制的,未必真的能够满足我们定义任务的需要。
https://msdn.microsoft.com/zh-tw/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx
Sample 3 线程返回值
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace CSParallel_Program
{
class ThreadWork1
{
public ThreadWork1()
{ }
public List<string> run()
{
List<string> RetList = new List<string>();
System.Console.WriteLine("ThreadWork1 run { ");
System.Console.WriteLine("ThreadWork1 running ... ... ");
for (int i = 0; i < 100; i++)
{
RetList.Add("ThreadWork1 : " + i);
//System.Console.WriteLine("ThreadWork1 : " + i);
}
System.Console.WriteLine("ThreadWork1 run } ");
return RetList;
}
}
class ThreadWork2
{
public ThreadWork2()
{ }
public List<string> run()
{
List<string> RetList = new List<string>();
System.Console.WriteLine("ThreadWork2 run { ");
System.Console.WriteLine("ThreadWork2 running ... ... ");
for (int i = 0; i < 100; i++)
{
RetList.Add("ThreadWork2 : " + i);
//System.Console.WriteLine("ThreadWork2 : " + i * i);
}
System.Console.WriteLine("ThreadWork2 run } ");
return RetList;
}
}
class Program
{
static List<string> StartT1()
{
ThreadWork1 work1 = new ThreadWork1();
return work1.run();
}
static List<string> StartT2()
{
ThreadWork2 work2 = new ThreadWork2();
return work2.run();
}
static void Main(string[] args)
{
// For return value we can‘t use this one : new Task(() => StartT2());
// The problem is the compiler will not know there is a return value.
// if we use t2.Result after that, there will be a compiler error.
var t1 = Task.Factory.StartNew(() => StartT1());
var t2 = Task.Factory.StartNew(() => StartT2());
Console.WriteLine("Sample 3-3 Main {");
// If we use Task.Factory.StartNew, it‘s no need to use t1.start()
//Console.WriteLine("Main t1 t2 started {");
//t1.Start();
//t2.Start();
//Console.WriteLine("Main t1 t2 started }");
Console.WriteLine("Main wait t1 t2 end {");
Task.WaitAll(t1, t2);
Console.WriteLine("Main wait t1 t2 end }");
var t3 = Task.Factory.StartNew(() =>
{
Console.WriteLine("============= T1 Result =============");
for (int i = 0; i < t1.Result.Count; i++)
{
Console.WriteLine(t1.Result[i]);
}
Console.WriteLine("============= ========= =============\n\n");
Console.WriteLine("============= T2 Result =============");
for (int i = 0; i < t2.Result.Count; i++)
{
Console.WriteLine(t2.Result[i]);
}
Console.WriteLine("============= ========= =============\n\n");
}, TaskCreationOptions.LongRunning
);
Console.WriteLine("Sample 3-3 Main }");
Console.ReadKey();
}
}
}
控制任务的执行顺序
Task还提供了简单的控制任务执行顺序的方式 ContinueWith()。
用ContinueWith和Wait函数组合使用便可以控制任务的运行顺序。
Sample 4 控制线程的运行顺序
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace CSParallel_Program
{
class ThreadWork1
{
public ThreadWork1()
{ }
public List<string> run()
{
List<string> RetList = new List<string>();
System.Console.WriteLine("ThreadWork1 run { ");
System.Console.WriteLine("ThreadWork1 running ... ... ");
for (int i = 0; i < 100; i++)
{
RetList.Add("ThreadWork1 : " + i);
//System.Console.WriteLine("ThreadWork1 : " + i);
}
System.Console.WriteLine("ThreadWork1 run } ");
return RetList;
}
}
class ThreadWork2
{
public ThreadWork2()
{ }
public List<string> run()
{
List<string> RetList = new List<string>();
System.Console.WriteLine("ThreadWork2 run { ");
System.Console.WriteLine("ThreadWork2 running ... ... ");
for (int i = 0; i < 100; i++)
{
RetList.Add("ThreadWork2 : " + i);
//System.Console.WriteLine("ThreadWork2 : " + i * i);
}
System.Console.WriteLine("ThreadWork2 run } ");
return RetList;
}
}
class Program
{
static void StartT0()
{
System.Console.WriteLine("Hello I am T0 Task, sleep 3 seconds. when I am ready others GO!");
for (int i = 0; i < 3; i++)
{
Console.WriteLine("StartT0 sleeping ... ... " + i);
System.Threading.Thread.Sleep(1000);
}
}
static List<string> StartT1()
{
ThreadWork1 work1 = new ThreadWork1();
return work1.run();
}
static List<string> StartT2()
{
ThreadWork2 work2 = new ThreadWork2();
return work2.run();
}
static void Main(string[] args)
{
Console.WriteLine("Sample 3-4 Main {");
// The sequence of the task is:
// T0 (Wait 3s) --> |
// | --> T1 (Cacluate) |
// | --> T2 (Cacluate) |
// | --> T3 (Print)
var t0 = Task.Factory.StartNew(() => StartT0());
var t1 = t0.ContinueWith((t) => StartT1());
var t2 = t0.ContinueWith((t) => StartT2());
Console.WriteLine("Main wait t1 t2 end {");
Task.WaitAll(t1, t2);
Console.WriteLine("Main wait t1 t2 end }");
var t3 = Task.Factory.StartNew(() =>
{
Console.WriteLine("============= T1 Result =============");
for (int i = 0; i < t1.Result.Count; i++)
{
Console.WriteLine(t1.Result[i]);
}
Console.WriteLine("============= ========= =============\n\n");
Console.WriteLine("============= T2 Result =============");
for (int i = 0; i < t2.Result.Count; i++)
{
Console.WriteLine(t2.Result[i]);
}
Console.WriteLine("============= ========= =============\n\n");
}, TaskCreationOptions.LongRunning);
Console.WriteLine("Sample 3-4 Main }");
Console.ReadKey();
}
}
}