从无到有实现.net协程(一)

协程的概念,就我而言,来源自当初学习Go,它可以用一句话来总结,“单线程无阻塞异步处理”,也就是说,首先,它的范围是针对单个线程来说的,一个线程可以运行多个代码片段,当运行期间遇到IO等待(包括网络IO、磁盘IO等,常见的数据库操作、web服务调用都属于IO等待)时,自动切换到其他代码片段上执行,当执行完毕或再次遇到IO等待时,再回到之前已经完成IO等待的代码片段继续执行,这样,就保证了线程无阻塞,并且多个片段都能被逐步处理(前提是代码中存在IO等待,否则还是顺序处理),以上就是协程的定义,从描述可以看到,这种处理方式特别适合用于高IO、低CPU计算的程序,现实中,大部分的Web应用都是属于这种模式,这就是为什么现在Nodejs, Go这类语言越来越火的原因。

下面的图描述了协程的运行

协程的实际处理顺序(实际取决与协程调度程序)

在.net中,web处理(asp.net)的模式为多线程异步,其实也很好,也可以做到无阻塞,充分利用CPU资源,这里不谈这种模式,只谈协程模式。

那么.net如何实现协程呢?这里首先必须介绍一个关键字,yield,.net协程就是利用它来实现的

Yield是用来处理迭代器(IEnumerator)的,利用迭代器的特性,间接提供了一种可以中断恢复的处理方式,以代码说话

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var result=TestYield();
 6
 7             result.MoveNext();
 8             Console.WriteLine(result.Current);
 9
10             result.MoveNext();
11             Console.WriteLine(result.Current);
12
13             result.MoveNext();
14             Console.WriteLine(result.Current);
15
16             result.MoveNext();
17
18             Console.Read();
19
20         }
21
22
23         static IEnumerator<string> TestYield()
24         {
25             yield return "A";
26             Console.WriteLine("执行完成A");
27             yield return "B";
28             Console.WriteLine("执行完成B");
29             yield return "C";
30             Console.WriteLine("执行完成C");
31         }
32     }

执行结果为

从上面的代码可以看出,每当使用MoveNext方法,代码都从yield return语句之后继续执行,当遇到yield return或方法完成,再次返回调用方,这种迭代器模式(迭代器模式是本身就是23种设计模式之一),就提供了分段执行代码的能力,我们通过这种模式,就能用来完成协程,这里有一个特别需要注意的地方,就是,当我们调用TestYield方法时,你会发现它其实并没有被执行,直到我们第一次调用MoveNext方法,该方法才真正开始被执行,还记得Linq吗,Linq说了,只有调用了诸如ToList()、Count()之类的方法,才真正开始计算,和这个是不是很像?其实,在Linq内部,就是使用了迭代器。

好了,现在,已经具备了实现协程的基本元素了,接下来就可以开始构建了,由两个部分组成,协程容器和协程单元,协程容器用来负责调度和执行,协程单元用来处理实际的业务,协程容器提供Register方法来注册每个协程单元,继续用代码来说话

 1     /// <summary>
 2     /// 协程容器接口
 3     /// </summary>
 4     public interface ICoroutineContainer
 5     {
 6         /// <summary>
 7         /// 注册协程单元
 8         /// </summary>
 9         /// <param name="unit">协程单元</param>
10         void Register(ICoroutineUnit unit);
11         /// <summary>
12         /// 执行
13         /// </summary>
14         void Run();
15 }
16     /// <summary>
17     /// 协程单元接口
18     /// </summary>
19     public interface ICoroutineUnit
20     {
21         /// <summary>
22         /// 处理业务
23         /// </summary>
24         /// <returns></returns>
25         IEnumerator<Task> Do();
26 }

这两个接口为整个实现的核心接口,下面的协程容器的基本实现

/// <summary>
    /// 协程容器的基本实现
    /// </summary>
    public class CoroutineContainerBase : ICoroutineContainer
    {
        /// <summary>
        /// 存储协程单元的列表
        /// </summary>
        private List<UnitItem> _units = new List<UnitItem>();
        /// <summary>
        /// 存储新注册的协程单元,与协程单元列表分开,实现注册与执行互不影响
        /// </summary>
        private List<UnitItem> _addUnits = new List<UnitItem>();
        /// <summary>
        /// 错误处理
        /// </summary>
        private Action<ICoroutineUnit, Exception> _errorHandle;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="errorHandle">错误处理</param>
        public CoroutineContainerBase(Action<ICoroutineUnit, Exception> errorHandle)
        {
            _errorHandle = errorHandle;
        }

        public void Register(ICoroutineUnit unit)
        {
            lock (_addUnits)
            {
                _addUnits.Add(new UnitItem() { Unit = unit, UnitResult = null });
            }

        }

        public void Run()
        {
  	    //开启一个单独任务执行
            Task.Run(() =>
            {
                //循环处理协程单元
                while (true)
                {
                    //将新注册的协程单元加入到列表中
                    lock (_addUnits)
                    {
                        foreach (var addItem in _addUnits)
                        {
                            _units.Add(addItem);
                        }
                        _addUnits.Clear();
                    }

                    //依次处理协程单元
                    foreach (var item in _units)
                    {
                        if (item.UnitResult == null)
                        {
                            var result = item.Unit.Do();

                            //运行到下一个断点
                            try
                            {
                                result.MoveNext();
                            }
                            catch (Exception ex)
                            {
                                _errorHandle(item.Unit, ex);

                                _units.Remove(item);

                                break;
                            }

                            item.UnitResult = result;
                        }
                        else
                        {
                            //检查等待是否已经完成,如果已经完成则继续运行
                            if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted)
                            {
                                var nextResult = true;
                                try
                                {
                                    nextResult = item.UnitResult.MoveNext();
                                }
                                catch (Exception ex)
                                {
                                    _errorHandle(item.Unit, ex);
                                    _units.Remove(item);

                                    break;
                                }
                                if (!nextResult)
                                {

                                    _units.Remove(item);

                                    break;
                                }
                            }
                        }
                    }

                }
            });

        }

        /// <summary>
        /// 协程单元存储格式
        /// </summary>
        private class UnitItem
        {
            /// <summary>
            /// 协程单元
            /// </summary>
            public ICoroutineUnit Unit { get; set; }
            /// <summary>
            /// 协程单元使用的迭代器
            /// </summary>
            public IEnumerator<Task> UnitResult { get; set; }
        }
    }

实现两个协程单元

/// <summary>
    /// 协程单元1
    /// 执行一个网络IO,访问163站点
    /// </summary>
    public class Action1 : ICoroutineUnit
    {
        public IEnumerator<Task> Do()
        {
            Console.WriteLine("开始执行Action1");
            HttpClient client = new HttpClient();

            yield return innerDo();

            Console.WriteLine("结束执行Action1");
        }

        private Task innerDo()
        {
            HttpClient client = new HttpClient();
            return client.GetAsync("http://www.163.com");
        }
    }

    /// <summary>
    /// 协程单元2
    /// 执行一个网络IO,访问163站点
    /// </summary>
    public class Action2 : ICoroutineUnit
    {
        public IEnumerator<Task> Do()
        {
            Console.WriteLine("开始执行Action2");
            yield return innerDo();
            Console.WriteLine("结束执行Action2");
        }

        private Task innerDo()
        {
            HttpClient client = new HttpClient();
            return client.GetAsync("http://www.163.com");
        }
}

主程序调用执行

        static void Main(string[] args)
        {
            //错误处理仅仅是将错误显示在控制台里
            Action<ICoroutineUnit,Exception> errorHandle = (unit, ex) =>
              {
                  Console.WriteLine(ex.ToString());
              };
            //初始化协程容器
            ICoroutineContainer coroutineContainerBase = new CoroutineContainerBase(errorHandle);
            //注册Action1
            coroutineContainerBase.Register(new Action1());
            //注册Action2
            coroutineContainerBase.Register(new Action2());
            //运行容器
            coroutineContainerBase.Run();

            Console.Read();

        }

执行结果

注意,每次执行的顺序可能不一样,取决于网络速度,但可以明显看到代码的分段执行。

至此,一个最基本的协程框架已经完成

时间: 2024-10-26 10:33:07

从无到有实现.net协程(一)的相关文章

python-进程池与线程池,协程

一.进程池与线程池 实现并发的手段有两种,多线程和多进程.注:并发是指多个任务看起来是同时运行的.主要是切换+保存状态. 当我们需要执行的并发任务大于cpu的核数时,我们需要知道一个操作系统不能无限的开启进程和线程,通常有几个核就开几个进程,如果进程开启过多,就无法充分利用cpu多核的优势,效率反而会下降.这个时候就引入了进程池线程池的概念. 池的功能就是限制启动的进程数或线程数 concurent.future模块: concurrent.futures模块提供了高度封装的异步调用接口 Pro

关于Unity协程(Coroutine)

协程官方doc解释A coroutine is a function that can suspend its execution(yield) until the given given YieldInstruction finishes. StartCoroutine开启协程 先执行协程中的代码 碰到yield return时控制权交给unity引擎 引擎继续做接下来的工作例如第一次yield return之后执行StartCoroutine下一行代码 直到满足yield指令的要求才会重新进

Gevent的socket协程安全性分析

一般讨论socket的并发安全性,都是指线程的安全性...而且绝大多数的情况下socket都不是线程安全的.. 当然一些框架可能会对socket进行一层封装,让其成为线程安全的...例如java的netty框架就是如此,将socket封装成channel,然后让channel封闭到一个线程中,那么这个channel的所有的读写都在它所在的线程中串行的进行,那么自然也就是线程安全的了..... 其实很早看Gevent的源码的时候,就已经看过这部分的东西了,当时就已经知道gevent的socket不

PHP实现协程

在服务器编程当中,为了实现异步,经常性的需要回调函数,例如以下这段代码 function send($value) { $data = process($value); onReceive($data); } function onReceive($recv_value) { var_dump($recv_value); } function process($value) { return $value+1; } $send_value = 1; send($send_value); 实现的东

python并发编程之---协程

1.什么是协程 协程:是单线程下的并发,又称微线程,纤程. 协程是一种用户态的轻量级线程,协程是由用户程序自己控制调度的. 2.需要注意的点: 需要强调的是: #1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行) #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关) 对比操作系统控制线程的切换,用户在单线程内控制协程的切换 优点

python协程有多厉害?

爬一个××网站上的东西,测算了一下协程的速度提升到底有多大,网站链接就不放了... import requests from bs4 import BeautifulSoup as sb import lxml import time url = 'http://www.××××.com/html/part/index27_' url_list = [] start = time.time() for i in range(2,47): print('get page '+str(i)) hea

进程、线程和协程的区别

进程: 进程之间不共享任何状态,进程的调度由操作系统完成,每个进程都有自己独立的内存空间,进程间通讯主要是通过信号传递的方式来实现的,实现方式有多种,信号量.管道.事件等,任何一种方式的通讯效率都需要过内核,导致通讯效率比较低.由于是独立的内存空间,上下文切换的时候需要保存先调用栈的信息.cpu各寄存器的信息.虚拟内存.以及打开的相关句柄等信息,所以导致上下文进程间切换开销很大,通讯麻烦. 线程: 线程之间共享变量,解决了通讯麻烦的问题,但是对于变量的访问需要锁,线程的调度主要也是有操作系统完成

爬虫协程比线程爬取速度更快?

先做个小示例,不用废话谈理论,没有实践的空谈都是扯蛋误导人. # coding=utf-8 import requests,time count=0 urlx= 'http://www.xxsy.net/' # 'http://www.danmeila.com/' http://www.sina.com.cn/ 'http://www.qingkan9.com/' # # 'http://www.qingkan9.com/' def fun(url): try: print url resp=r

【Unity笔记】协程Coroutine的简单优化

一个最简单的协程,也至少需要2帧才能完成.第一帧走到yield return null停止,第二帧从此处接着执行完下面的操作.需求:如果缓存中存在某数据则直接使用,否则联网异步下载. private bool cached; // 该数据是否已有缓存 void Start(){ StartCoroutine(Download()); } IEnumerator WorkWhenDownload() { if(cached){ // 直接使用缓存 }else{ // 没有缓存,联网下载 WWW w