C# 并行编程 之 命令式任务并行 (.Net Framework 4.0)

此文为个人学习《C#并行编程高级教程》的笔记,总结并调试了一些文章中的代码示例。 在以后开发过程中可以加以运用。

最基本的使用,并行任务的创建

在 .Net Framework 4 中出现了Task 的概念。在以往的多线程程序中虽然使用的是thread,但大多数的时候我们还是会把业务处理划分为Task。这样更接近于人类的思考方式,毕竟Thread不能说明它和业务的关联。这里C#直接提供了task,也算是对开发者简便了一些。

Task 的使用非常简单,定义工作函数,创建Task,Task开始运行,Wait/WaitAll 等待Task结束。

Sample 1 并行任务的创建

using System;
using System.Collections.Generic;
using System.Text;

using System.Threading.Tasks;

namespace CSParallel_Program
{
    class ThreadWork1
    {
        public ThreadWork1()
        { }

        public void run()
        {
            System.Console.WriteLine("ThreadWork1 run { ");
            for (int i = 0; i < 100; i++)
            {
                System.Console.WriteLine("ThreadWork1 : " + i);
            }
            System.Console.WriteLine("ThreadWork1 run } ");
        }
    }

    class ThreadWork2
    {
        public ThreadWork2()
        { }

        public void run()
        {
            System.Console.WriteLine("ThreadWork2 run { ");
            for (int i = 0; i < 100; i++)
            {
                System.Console.WriteLine("ThreadWork2 : " + i*i);
            }
            System.Console.WriteLine("ThreadWork2 run } ");
        }
    }

    class Program
    {
        static void StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run();
        }

        static void StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run();
        }
        static void Main(string[] args)
        {
            Task t1 = new Task(() => StartT1());
            Task t2 = new Task(() => StartT2());

            Console.WriteLine("Sample 3-1 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1,t2);
            Console.WriteLine("Main wait t1 t2 end }");
            Console.WriteLine("Sample 3-1 Main }");
            Console.ReadKey();
        }
    }
}

线程的取消

Task 提供了线程取消的操作,使用起来也是非常的简单。它本质上是靠异常来打断线程的执行,并且把Task的状态置为Cancel状态(但这一点在测试程序中,Task的状态并没有置为Cancel,而是Faild,也许换个PC能解决这个问题也不一定)。

要实现线程的取消需要一下步骤。

1) 首先要在线程中加入取消检查点(ThrowIfCancellationRequested),这里是在工作函数的起始处和while循环的运行中。

2) 在Main函数中定义CancellationTokenSource对象并把它作为参数传递给工作Task。

3) 在运行时调用CancellationTokenSource.Cancel(),触发线程的取消。

4) 在Main函数中捕获取消异常(OperationCanceledException),线程终止,取消成功。

Sample 2 线程的取消

using System;
using System.Collections.Generic;
using System.Text;

using System.Threading.Tasks;

namespace CSParallel_Program
{
    class ThreadWork1
    {
        public ThreadWork1()
        { }

        public void run_with_cancel(System.Threading.CancellationToken ct)
        {
            //try
            //{
                System.Console.WriteLine("ThreadWork1 run { ");
                ct.ThrowIfCancellationRequested();
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork1 : " + i);
                    System.Threading.Thread.Sleep(200);
                    ct.ThrowIfCancellationRequested();
                }
                System.Console.WriteLine("ThreadWork1 run } ");
            //}
            //catch (OperationCanceledException ex)
            //{
            //    System.Console.WriteLine("ThreadWork1 Get Exception : " + ex.ToString());
            //}
        }
    }

    class ThreadWork2
    {
        public ThreadWork2()
        { }

        public void run_with_cancel(System.Threading.CancellationToken ct)
        {
            //try
            //{
                ct.ThrowIfCancellationRequested();
                System.Console.WriteLine("ThreadWork2 run { ");
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork2 : " + i * i);
                    System.Threading.Thread.Sleep(300);
                    ct.ThrowIfCancellationRequested();
                }
                System.Console.WriteLine("ThreadWork2 run } ");
            //}
            //catch (OperationCanceledException ex)
            //{
            //    System.Console.WriteLine("ThreadWork2 Get Exception : " + ex.ToString());
            //}
        }
    }

    class Program
    {
        static void StartT1(System.Threading.CancellationToken ct)
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run_with_cancel(ct);
        }

        static void StartT2(System.Threading.CancellationToken ct)
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run_with_cancel(ct);
        }
        static void Main(string[] args)
        {
            System.Threading.CancellationTokenSource cts =
                new System.Threading.CancellationTokenSource();
            System.Threading.CancellationToken ct = cts.Token;

            Task t1 = new Task(() => StartT1(ct));
            Task t2 = new Task(() => StartT2(ct));

            Console.WriteLine("Sample 3-1 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main sleep 2 seconds and CANCEL {");
            System.Threading.Thread.Sleep(2000);
            cts.Cancel();
            Console.WriteLine("Main sleep 2 seconds and CANCEL }");

            try
            {
                Console.WriteLine("Main wait t1 t2 end {");
                if (!Task.WaitAll(new Task[] { t1, t2 }, 5000))
                {
                    Console.WriteLine("Worker1 and Worker2 NOT complete within 5 seconds");
                    Console.WriteLine("Worker1 Status: " + t1.Status);
                    Console.WriteLine("Worker2 Status: " + t2.Status);
                }
                else
                {
                    Console.WriteLine("Worker1 and Worker2 complete within 5 seconds");
                }
                Console.WriteLine("Main wait t1 t2 end }");
            }
            catch (AggregateException agg_ex)
            {
                foreach (Exception ex in agg_ex.InnerExceptions)
                {
                    Console.WriteLine("Agg Exceptions: " + ex.ToString());
                    Console.WriteLine("");
                }
            }

            if (t1.IsCanceled)
            {
                Console.WriteLine("Worker 1 is CANCELED");
            }

            if (t2.IsCanceled)
            {
                Console.WriteLine("Worker 2 is CANCELED");
            }

            Console.WriteLine("Sample 3-1 Main }");

            Console.ReadKey();
        }
    }
}

线程的返回值

Task可以提供返回值服务,这个与Thread还是有些区别的,在Thread中,无论是C#还是C++会通过共享的队列等方式返回测试结果。

使用返回值的步骤:

1) 第一步当然要给工作函数定义返回值,并且在实现的过程中返回有效值。

2) 定义Task时,要使用var而不能使用Task了,而且还得使用Task.Factory。 var t1 = Task.Factory.StartNew(() => StartT1()); 。后面取得结果时需要用到t1.Result,如果使用把t1定义为Task它是没有Result属性的。

3) 运行Task。

4) 取得结果,主要使用的是 t1.Result 属性。

5) 关于 TaskCreationOptions.LongRunning,我们可以指定操作系统运行任务时的一些方式,当然这些方式还是由系统控制的,未必真的能够满足我们定义任务的需要。

https://msdn.microsoft.com/zh-tw/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx

Sample 3 线程返回值

using System;
using System.Collections.Generic;
using System.Text;

using System.Threading.Tasks;

namespace CSParallel_Program
{
    class ThreadWork1
    {
        public ThreadWork1()
        { }

        public List<string> run()
        {
            List<string> RetList = new List<string>();
            System.Console.WriteLine("ThreadWork1 run { ");
            System.Console.WriteLine("ThreadWork1 running ... ... ");
            for (int i = 0; i < 100; i++)
            {
                RetList.Add("ThreadWork1 : " + i);
                //System.Console.WriteLine("ThreadWork1 : " + i);
            }
            System.Console.WriteLine("ThreadWork1 run } ");

            return RetList;
        }
    }

    class ThreadWork2
    {
        public ThreadWork2()
        { }

        public List<string> run()
        {
            List<string> RetList = new List<string>();

            System.Console.WriteLine("ThreadWork2 run { ");
            System.Console.WriteLine("ThreadWork2 running ... ... ");
            for (int i = 0; i < 100; i++)
            {
                RetList.Add("ThreadWork2 : " + i);
                //System.Console.WriteLine("ThreadWork2 : " + i * i);
            }
            System.Console.WriteLine("ThreadWork2 run } ");
            return RetList;
        }
    }

    class Program
    {
        static List<string> StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            return work1.run();
        }

        static List<string> StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            return work2.run();
        }
        static void Main(string[] args)
        {
            // For return value we can‘t use this one :  new Task(() => StartT2());
            // The problem is the compiler will not know there is a return value.
            // if we use t2.Result after that, there will be a compiler error.
            var t1 = Task.Factory.StartNew(() => StartT1());
            var t2 = Task.Factory.StartNew(() => StartT2());

            Console.WriteLine("Sample 3-3 Main {");

            // If we use Task.Factory.StartNew, it‘s no need to use t1.start()
            //Console.WriteLine("Main t1 t2 started {");
            //t1.Start();
            //t2.Start();
            //Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            var t3 = Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("============= T1 Result =============");
                    for (int i = 0; i < t1.Result.Count; i++)
                    {
                        Console.WriteLine(t1.Result[i]);
                    }
                    Console.WriteLine("============= ========= =============\n\n");

                    Console.WriteLine("============= T2 Result =============");
                    for (int i = 0; i < t2.Result.Count; i++)
                    {
                        Console.WriteLine(t2.Result[i]);
                    }
                    Console.WriteLine("============= ========= =============\n\n");
                }, TaskCreationOptions.LongRunning
                );

            Console.WriteLine("Sample 3-3 Main }");

            Console.ReadKey();
        }
    }
}

控制任务的执行顺序

Task还提供了简单的控制任务执行顺序的方式 ContinueWith()。

用ContinueWith和Wait函数组合使用便可以控制任务的运行顺序。

Sample 4 控制线程的运行顺序

using System;
using System.Collections.Generic;
using System.Text;

using System.Threading.Tasks;

namespace CSParallel_Program
{
    class ThreadWork1
    {
        public ThreadWork1()
        { }

        public List<string> run()
        {
            List<string> RetList = new List<string>();
            System.Console.WriteLine("ThreadWork1 run { ");
            System.Console.WriteLine("ThreadWork1 running ... ... ");
            for (int i = 0; i < 100; i++)
            {
                RetList.Add("ThreadWork1 : " + i);
                //System.Console.WriteLine("ThreadWork1 : " + i);
            }
            System.Console.WriteLine("ThreadWork1 run } ");

            return RetList;
        }
    }

    class ThreadWork2
    {
        public ThreadWork2()
        { }

        public List<string> run()
        {
            List<string> RetList = new List<string>();

            System.Console.WriteLine("ThreadWork2 run { ");
            System.Console.WriteLine("ThreadWork2 running ... ... ");
            for (int i = 0; i < 100; i++)
            {
                RetList.Add("ThreadWork2 : " + i);
                //System.Console.WriteLine("ThreadWork2 : " + i * i);
            }
            System.Console.WriteLine("ThreadWork2 run } ");
            return RetList;
        }
    }

    class Program
    {
        static void StartT0()
        {
            System.Console.WriteLine("Hello I am T0 Task, sleep 3 seconds. when I am ready others GO!");
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine("StartT0 sleeping  ... ... " + i);
                System.Threading.Thread.Sleep(1000);
            }
        }

        static List<string> StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            return work1.run();
        }

        static List<string> StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            return work2.run();
        }
        static void Main(string[] args)
        {
            Console.WriteLine("Sample 3-4 Main {");
            // The sequence of the task is:
            // T0 (Wait 3s) --> |
            //                  | --> T1 (Cacluate) |
            //                  | --> T2 (Cacluate) |
            //                                      |  --> T3 (Print)

            var t0 = Task.Factory.StartNew(() => StartT0());
            var t1 = t0.ContinueWith((t) => StartT1());
            var t2 = t0.ContinueWith((t) => StartT2());

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            var t3 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("============= T1 Result =============");
                for (int i = 0; i < t1.Result.Count; i++)
                {
                    Console.WriteLine(t1.Result[i]);
                }
                Console.WriteLine("============= ========= =============\n\n");

                Console.WriteLine("============= T2 Result =============");
                for (int i = 0; i < t2.Result.Count; i++)
                {
                    Console.WriteLine(t2.Result[i]);
                }
                Console.WriteLine("============= ========= =============\n\n");
            }, TaskCreationOptions.LongRunning);

            Console.WriteLine("Sample 3-4 Main }");

            Console.ReadKey();
        }
    }
}

代码示例工程下载

时间: 2024-10-10 21:23:48

C# 并行编程 之 命令式任务并行 (.Net Framework 4.0)的相关文章

.Net并行编程系列之一:并行基础

现在普通PC平台上面多核处理器的普及,让我们领教了能够利用多核进行并行计算的软件的处理能力,同时继承更多地核心正是当前处理器发展的趋势. 但是作为一个.NET开发人员,是否有时候会发现你的程序占用了其中一个核心的大部分运行时间,甚至达到了100%,除了继续优化处理问题的算法. 那么还有方法能够利用CPU的其它核心为你的应用程序所用么?答案是显然可以的. 你所要做的,就是把应用程序的任务分割成块,使各块在同一时间运行. .Net 4.0引入了一种新的编程模型,结合VS2010使得开发并行应用程序变

C#并行编程 z

目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制

并行编程入门

目录 1. 并行编程简介 2. MapReduce 2.1 MapReduce简介 2.2 MapReduce框架 2.3 Hadoop介绍 2.4 Hadoop基本类 2.5 Hadoop编程实例 1.并行编程简介 1.1.并行编程作用,用途 商业用途,科学计算,大数据分析 1.2.并行编程兴起原因 目前的串行编程的局限性 使用的流水线等隐式并行模式的局限性 硬件的发展 1.3.并行算法设计原则步骤 a.分析问题 b.分解问题 其中分解方法有: 数据分解 递归分解 探测性分解 推测性分解 混合

Java:并行编程及同步使用方法

知道java可以使用java.util.concurrent包下的 CountDownLatch ExecutorService Future Callable 实现并行编程,并在并行线程同步时,用起来十分简单的一种 .实现原理: 1.CountDownLatch 统计并行线程完成数,并提供了await()方法,实现等待所有并行线程完成,或者指定最大等待时间.2.ExecutorService提供了execute(Callable)执行线程方法,还提供了submit(Callable)提交线程.

理解并行编程

并行编程从业务实现的角度可分为数据并行与任务并行,也就是要解决的问题是以数据为核心还是以要处理的事情为核心.基于任务的并行编程模型TPL(任务并行库)是从业务角度实现的并行模型,它以System.Threading.Tasks命名空间下的Parallel类为实现核心类,优点是不需要我们考虑不同的硬件差异,只需要重点关注所实现的任务. 1.任务并行库TPL TPL主要包括数据并行和任务并行,无论是数据并行还是任务并行,都可以使用并行查询PLINQ提高数据查询的效率.数据并行是对数据集合中的元素同时

C#并行编程-Task

原文:C#并行编程-Task 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 任务简介 TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码. 但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有一对一的关系.) 创建一个新的任务时,调度器(调度器依赖于底层的线程池

C#并行编程-Parallel

原文:C#并行编程-Parallel 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. TPL中引入了一个新命名空间System.Threading.Tasks,在该命名空间下Task是主类,表示一个类的异步的并发的操作,创建并行代码的时候不一定要直接使用Task类,在某些情况下可以直接使用Parallel静态类(System.Threading.Tasks.Parallel)下所提供的方法,而不用底层的Task实例. Parallel.Invoke  试图将很多方

.NET 并行编程

本文内容 并行编程 数据并行 最近,对多线程编程,并行编程,异步编程,这三个概念有点晕了,怎么突然觉得,自己有点不明白这三者之间有什么联系和区别了呢? 因此,回顾了一下个人经历,屡屡思路~我刚接触计算机时,还是学校的 DOS 和 win 3.x,之后,学校换了 Windows 95,再之后,我有自己的台式机--但是无论如何,那时的电脑 CPU 都是单核的,即便采用多线程,无论看上多么像"同时"执行的,本质上还是顺序的,因为代码段是独占 CPU 的:之后,我把台式机卖了,买了个笔记本电脑

.NET 4.0 任务和并行编程系列

8天玩转并行开发 8天玩转并行开发——第一天 Parallel的使用 8天玩转并行开发——第二天 Task的使用 8天玩转并行开发——第三天 plinq的使用 8天玩转并行开发——第四天 同步机制(上) 8天玩转并行开发——第五天 同步机制(下) 8天玩转并行开发——第六天 异步编程模型 8天玩转并行开发——第七天 简要分析任务与线程池 8天玩转并行开发——第八天 用VS性能向导解剖你的程序 并行编程系列 .NET 4 并行(多核)编程系列之一入门介绍 .NET 4 并行(多核)编程系列之二 从