协程的概念,就我而言,来源自当初学习Go,它可以用一句话来总结,“单线程无阻塞异步处理”,也就是说,首先,它的范围是针对单个线程来说的,一个线程可以运行多个代码片段,当运行期间遇到IO等待(包括网络IO、磁盘IO等,常见的数据库操作、web服务调用都属于IO等待)时,自动切换到其他代码片段上执行,当执行完毕或再次遇到IO等待时,再回到之前已经完成IO等待的代码片段继续执行,这样,就保证了线程无阻塞,并且多个片段都能被逐步处理(前提是代码中存在IO等待,否则还是顺序处理),以上就是协程的定义,从描述可以看到,这种处理方式特别适合用于高IO、低CPU计算的程序,现实中,大部分的Web应用都是属于这种模式,这就是为什么现在Nodejs, Go这类语言越来越火的原因。
下面的图描述了协程的运行
协程的实际处理顺序(实际取决与协程调度程序)
在.net中,web处理(asp.net)的模式为多线程异步,其实也很好,也可以做到无阻塞,充分利用CPU资源,这里不谈这种模式,只谈协程模式。
那么.net如何实现协程呢?这里首先必须介绍一个关键字,yield,.net协程就是利用它来实现的
Yield是用来处理迭代器(IEnumerator)的,利用迭代器的特性,间接提供了一种可以中断恢复的处理方式,以代码说话
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var result=TestYield(); 6 7 result.MoveNext(); 8 Console.WriteLine(result.Current); 9 10 result.MoveNext(); 11 Console.WriteLine(result.Current); 12 13 result.MoveNext(); 14 Console.WriteLine(result.Current); 15 16 result.MoveNext(); 17 18 Console.Read(); 19 20 } 21 22 23 static IEnumerator<string> TestYield() 24 { 25 yield return "A"; 26 Console.WriteLine("执行完成A"); 27 yield return "B"; 28 Console.WriteLine("执行完成B"); 29 yield return "C"; 30 Console.WriteLine("执行完成C"); 31 } 32 }
执行结果为
从上面的代码可以看出,每当使用MoveNext方法,代码都从yield return语句之后继续执行,当遇到yield return或方法完成,再次返回调用方,这种迭代器模式(迭代器模式是本身就是23种设计模式之一),就提供了分段执行代码的能力,我们通过这种模式,就能用来完成协程,这里有一个特别需要注意的地方,就是,当我们调用TestYield方法时,你会发现它其实并没有被执行,直到我们第一次调用MoveNext方法,该方法才真正开始被执行,还记得Linq吗,Linq说了,只有调用了诸如ToList()、Count()之类的方法,才真正开始计算,和这个是不是很像?其实,在Linq内部,就是使用了迭代器。
好了,现在,已经具备了实现协程的基本元素了,接下来就可以开始构建了,由两个部分组成,协程容器和协程单元,协程容器用来负责调度和执行,协程单元用来处理实际的业务,协程容器提供Register方法来注册每个协程单元,继续用代码来说话
1 /// <summary> 2 /// 协程容器接口 3 /// </summary> 4 public interface ICoroutineContainer 5 { 6 /// <summary> 7 /// 注册协程单元 8 /// </summary> 9 /// <param name="unit">协程单元</param> 10 void Register(ICoroutineUnit unit); 11 /// <summary> 12 /// 执行 13 /// </summary> 14 void Run(); 15 } 16 /// <summary> 17 /// 协程单元接口 18 /// </summary> 19 public interface ICoroutineUnit 20 { 21 /// <summary> 22 /// 处理业务 23 /// </summary> 24 /// <returns></returns> 25 IEnumerator<Task> Do(); 26 }
这两个接口为整个实现的核心接口,下面的协程容器的基本实现
/// <summary> /// 协程容器的基本实现 /// </summary> public class CoroutineContainerBase : ICoroutineContainer { /// <summary> /// 存储协程单元的列表 /// </summary> private List<UnitItem> _units = new List<UnitItem>(); /// <summary> /// 存储新注册的协程单元,与协程单元列表分开,实现注册与执行互不影响 /// </summary> private List<UnitItem> _addUnits = new List<UnitItem>(); /// <summary> /// 错误处理 /// </summary> private Action<ICoroutineUnit, Exception> _errorHandle; /// <summary> /// 构造函数 /// </summary> /// <param name="errorHandle">错误处理</param> public CoroutineContainerBase(Action<ICoroutineUnit, Exception> errorHandle) { _errorHandle = errorHandle; } public void Register(ICoroutineUnit unit) { lock (_addUnits) { _addUnits.Add(new UnitItem() { Unit = unit, UnitResult = null }); } } public void Run() { //开启一个单独任务执行 Task.Run(() => { //循环处理协程单元 while (true) { //将新注册的协程单元加入到列表中 lock (_addUnits) { foreach (var addItem in _addUnits) { _units.Add(addItem); } _addUnits.Clear(); } //依次处理协程单元 foreach (var item in _units) { if (item.UnitResult == null) { var result = item.Unit.Do(); //运行到下一个断点 try { result.MoveNext(); } catch (Exception ex) { _errorHandle(item.Unit, ex); _units.Remove(item); break; } item.UnitResult = result; } else { //检查等待是否已经完成,如果已经完成则继续运行 if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted) { var nextResult = true; try { nextResult = item.UnitResult.MoveNext(); } catch (Exception ex) { _errorHandle(item.Unit, ex); _units.Remove(item); break; } if (!nextResult) { _units.Remove(item); break; } } } } } }); } /// <summary> /// 协程单元存储格式 /// </summary> private class UnitItem { /// <summary> /// 协程单元 /// </summary> public ICoroutineUnit Unit { get; set; } /// <summary> /// 协程单元使用的迭代器 /// </summary> public IEnumerator<Task> UnitResult { get; set; } } }
实现两个协程单元
/// <summary> /// 协程单元1 /// 执行一个网络IO,访问163站点 /// </summary> public class Action1 : ICoroutineUnit { public IEnumerator<Task> Do() { Console.WriteLine("开始执行Action1"); HttpClient client = new HttpClient(); yield return innerDo(); Console.WriteLine("结束执行Action1"); } private Task innerDo() { HttpClient client = new HttpClient(); return client.GetAsync("http://www.163.com"); } } /// <summary> /// 协程单元2 /// 执行一个网络IO,访问163站点 /// </summary> public class Action2 : ICoroutineUnit { public IEnumerator<Task> Do() { Console.WriteLine("开始执行Action2"); yield return innerDo(); Console.WriteLine("结束执行Action2"); } private Task innerDo() { HttpClient client = new HttpClient(); return client.GetAsync("http://www.163.com"); } }
主程序调用执行
static void Main(string[] args) { //错误处理仅仅是将错误显示在控制台里 Action<ICoroutineUnit,Exception> errorHandle = (unit, ex) => { Console.WriteLine(ex.ToString()); }; //初始化协程容器 ICoroutineContainer coroutineContainerBase = new CoroutineContainerBase(errorHandle); //注册Action1 coroutineContainerBase.Register(new Action1()); //注册Action2 coroutineContainerBase.Register(new Action2()); //运行容器 coroutineContainerBase.Run(); Console.Read(); }
执行结果
注意,每次执行的顺序可能不一样,取决于网络速度,但可以明显看到代码的分段执行。
至此,一个最基本的协程框架已经完成