深入tornado中的协程

tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费。

tornado中的I/O多路复用前面已经讲过了。本文不做详细解释。

来看一下tornado中的协程模块:tornado.gen:

tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步。

先来说一下tornado.gen.coroutine的实现思路:

  我们知道generator中的yield语句可以使函数暂停执行,而send()方法则可以恢复函数的执行。

  tornado将那些异步操作放置到yield语句后,当这些异步操作完成后,tornado会将结果send()至generator中恢复函数执行。

在tornado的官方文档中有这么一句话:

Most asynchronous functions in Tornado return a Future; yielding this object returns its result.

就是说:在tornado中大多数的异步操作返回一个Future对象,yield Future对象 会返回该异步操作的结果。

那么,Future对象到底是什么?

一  Future对象

先来说说Future对象:

Future对象可以概括为: 一个异步操作的占位符,当然这个占位符有些特殊,它特殊在:

  1 这个占位符是一个对象

  2 这个对象包含了很多属性,包括_result 以及 _callbacks,分别用来存储异步操作的结果以及回调函数

  3 这个对象包含了很多方法,比如添加回调函数,设置异步操作结果等。

  4 当这个对象对应的异步操作完成后,该对象会被set_done,然后遍历并运行_callbacks中的回调函数

来看一下Future的简化版

class Future(object):
    ‘‘‘
        Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
        如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数
    ‘‘‘
    def __init__(self):
        self._result = None    # 执行的结果
        self._callbacks = []    # 用来保存该future对象的回调函数

    def result(self, timeout=None):
        # 如果操作成功,返回结果。如果失败则抛出异常
        self._clear_tb_log()
        if self._result is not None:
            return self._result
        if self._exc_info is not None:
            raise_exc_info(self._exc_info)
        self._check_done()
        return self._result

    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._set_done()

    def _set_done(self):
        # 执行结束(成功)后的操作。
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception(‘Exception in callback %r for %r‘, cb, self)
        self._callbacks = None

完整源码:

 Future源码

二  gen.coroutine装饰器

tornado中的协程是通过tornado.gen中的coroutine装饰器实现的:

def coroutine(func, replace_callback=True):
    return _make_coroutine_wrapper(func, replace_callback=True)
_make_coroutine_wrapper :

def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        ‘‘‘
            大体过程:
            future = TracebackFuture()
            result = func(*args, **kwargs)
            if isinstance(result, GeneratorType):
                yielded = next(result)
                Runner(result, future, yielded)
            return future
        ‘‘‘
        future = TracebackFuture()                   # TracebackFuture = Future

        if replace_callback and ‘callback‘ in kwargs:
            callback = kwargs.pop(‘callback‘)
            IOLoop.current().add_future(future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)           # 执行func,若func中包含yield,则返回一个generator对象
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):      # 判断其是否为generator对象
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)            # 第一次执行
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                ‘stack_context inconsistency (probably caused ‘
                                ‘by yield within a "with StackContext" block)‘))
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)  # Runner(result, future, yield)
                try:
                    return future            
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

先来看一下大体过程:

  1  首先生成一个Future对象

  2  运行该被装饰函数并将结果赋值给result。 在这里因为tornado的‘异步‘实现是基于generator的,所以一般情况下 result是一个generator对象

  3  yielded = next(result)  执行到被装饰函数的第一次yield,将结果赋值给yielded。一般情况下,yielded很大情况下是一个Future对象。

  4  Runner(result, future, yielded)

  5  return future

除了第4步以外其他都很好理解,所以来了解一下第四步Runner()干了些啥:

三  Runner()类

1 为什么要有Runner()?或者说Runner()的作用是什么?

Runner()可以自动的将异步操作的结果send()至生成器中止的地方

tornado的协程或者说异步是基于generator实现的,generator较为常用的有两个方法:send() next() ,关于这两个方法的流程分析在这

很多情况下会有generator的嵌套。比如说经常会yield 一个generator。当A生成器yield B生成器时,分两步:

  1 我们首先中止A的执行转而执行B

  2 当B执行完成后,我们需要将B的结果send()至A中止的地方,继续执行A

Runner()主要就是来做这些的,也就是控制生成器的执行与中止,并在合适的情况下使用send()方法同时传入B生成器的结果唤醒A生成器。

来看一个简单例子:

 

上例中的Runner()仅仅完成了第一步,我们还需要手动的执行第二步,而tornado的gen的Runner()则做了全套奥!

2 剖析Runner()

在Runner()中主要有三个方法__init__  handle_yield  run:

 Runner()

2.1 __init__方法

__init__ 里面执行了一些初始化的操作,最主要是最后两句:

if self.handle_yield(first_yielded): # 运行
    self.run()

2.2 handle_yield方法

handle_yield(self, yielded) 函数,这个函数顾名思义,就是用来处理yield返回的对象的。

首先我们假设yielded是一个Future对象(因为这是最常用的情况),这样的话代码就缩减了很多

def handle_yield(self, yielded):
        self.future = convert_yielded(yielded)                         # 如果yielded是Future对象则原样返回
        if not self.future.done() or self.future is moment:            # moment是tornado初始化时就建立的一个Future对象,且被set_result(None)
            self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
            return False
        return True

也就是干了三步:

  首先解析出self.future

  然后判断self.future对象是否已经被done(完成),如果没有的话为其添加回调函数,这个回调函数会执行self.run()

  返回self.future对象是否被done

总体来说,handle_yield返回yielded对象是否被set_done,如果没有则为yielded对象添加回调函数,这个回调函数执行self.run()

还有一个有趣的地方,就是上面代码的第四行:  self.io_loop.add_future(self.future, lambda f: self.run())

def add_future(self, future, callback):
    # 为future添加一个回调函数,这个回调函数的作用是:将参数callback添加至self._callbacks中
    # 大家思考一个问题: 如果某个Future对象被set_done,那么他的回调函数应该在什么时候执行?
    # 是立即执行亦或者是将回调函数添加到IOLoop实例的_callbacks中进行统一执行?
    # 虽然前者更简单,但导致回调函数的执行过于混乱,我们应该让所有满足执行条件的回调函数统一执行。显然后者更合理
    # 而add_future()的作用就是这样
    future.add_done_callback(lambda future: self.add_callback(callback, future))

def add_callback(self, callback, *args, **kwargs):
    # 将callback添加至_callbacks列表中
    self._callbacks.append(functools.partial(callback, *args, **kwargs))

2.3 run方法

再来看self.run()方法。这个方法实际上就是一个循环,不停的执行generator的send()方法,发送的值就是yielded的result。

我们可以将run()方法简化一下:

    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done
        """
        try:
            self.running = True
            while True:
                future = self.future  
                if not future.done():
                    return
                self.future = None      # 清空self.future
                value = future.result()   # 获取future对象的结果
                try:
                    yielded = self.gen.send(value)  # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象)
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                if not self.handle_yield(yielded):  # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环
                    return
        finally:
            self.running = False

总结:

  1 每一个Future对应一个异步操作

  2 该Future对象可以添加回调函数,当该异步操作完成后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数

  3 凡是使用了coroutine装饰器的generator函数都会返回一个Future对象,同时会不断为该generator,该generator每一次运行send()或者next()的返回结果yielded以及future对象运行Runner()

  4 Runner()会对generator不断进行send()或者next()操作。具体步骤是:上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send()至generator中,不断循环该操作,直到产生StopIteration或者Return异常(这表示该generator执行结束),这时会为该generator对应的Future对象set_result。

我们可以看到tornado的协程是基于generator的,generator可以通过yield关键字暂停执行,也可以通过next()或者send()恢复执行,同时send()可以向generator中传递值。

而将协程连接起来的纽带则是Future对象,每一个Future对象都对应着一个异步操作,我们可以为该对象添加许多回调函数,当异步操作完成后通过对Future对象进行set_done或者set_result就可以执行相关的回调函数。

提供动力的则是Runner(),他不停的将generator所yield的每一个future对象的结果send()至generator,当generator运行结束,他会进行最后的包装工作,对该generator所对应的Future对象执行set_result操作。

参考:

  http://blog.csdn.net/wyx819/article/details/45420017

  http://www.cnblogs.com/apexchu/p/4226784.html

时间: 2024-08-02 15:14:28

深入tornado中的协程的相关文章

在PHP中使用协程实现多任务调度

PHP5.5一个比较好的新功能是加入了对迭代生成器和协程的支持.对于生成器,PHP的文档和各种其他的博客文章已经有了非常详细的讲解.协程相对受到的关注就少了,因为协程虽然有很强大的功能但相对比较复杂, 也比较难被理解,解释起来也比较困难. 这篇文章将尝试通过介绍如何使用协程来实施任务调度, 来解释在PHP中的协程. 我将在前三节做一个简单的背景介绍.如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节. 迭代生成器 生成器也是一个函数,不同的是这个函数的返回值是依次输出,而不是只返回一

fasthttp中的协程池实现

fasthttp中的协程池实现 协程池可以控制并行度,复用协程.fasthttp 比 net/http 效率高很多倍的重要原因,就是利用了协程池.实现并不复杂,我们可以参考他的设计,写出高性能的应用. 入口 // server.go func (s *Server) Serve(ln net.Listener) error { var lastOverflowErrorTime time.Time var lastPerIPErrorTime time.Time var c net.Conn v

Unity中的协程(一)

这篇文章很不错的问题,推荐阅读英文原版: Introduction to Coroutines Scripting with Coroutines   这篇文章转自:http://blog.csdn.net/huang9012/article/details/38492937 协程介绍 在Unity中,协程(Coroutines)的形式是我最喜欢的功能之一,几乎在所有的项目中,我都会使用它来控制运动,序列,以及对象的行为.在这个教程中,我将会说明协程是如何工作的,并且会附上一些例子来介绍它的用法

Python 中的协程 (5) 无阻塞

1 异步程序依然会假死 freezing 1)一般程序的调用方 freezing import asyncio import time import threading #定义一个异步操作 async def hello1(a,b): print(f"异步函数开始执行") await asyncio.sleep(3) print("异步函数执行结束") return a+b #在一个异步操作里面调用另一个异步操作 async def main(): c=await

Unity中使用协程进行服务端数据验证手段

近期在做项目中的个人中心的一些事情,用户头像上传,下载,本地缓存,二级缓存,压缩,这些都要做,麻雀虽小五脏俱全啊,也是写的浑浑噩噩的, 当我们在上传用户头像的时候,向服务端发送上传头像请求之前,一般都会做一次验证,向服务端获取token验证信息,来确保非法上传,如果不做这个那么会有非法用户上传非法图像,使你的服务器 带来未知的灾难. 而验证的逻辑很好写,并没有什么难度,比如: Server.SendMessage("获取token"); Client.Receive(string to

【Unity3D基础教程】(五):详解Unity3D中的协程(Coroutine)

[狗刨学习网] 为什么需要协程 在游戏中有许多过程(Process)需要花费多个逻辑帧去计算. 你会遇到"密集"的流程,比如说寻路,寻路计算量非常大,所以我们通常会把它分割到不同的逻辑帧去进行计算,以免影响游戏的帧率. 你会遇到"稀疏"的流程,比如说游戏中的触发器,这种触发器大多数时候什么也不做,但是一旦被调用会做非常重要的事情(比图说游戏中自动开启的门就是在门前放了一个Empty Object作为trigger,人到门前就会触发事件). 不管什么时候,如果你想创建

【Unity优化】如何实现Unity编辑器中的协程

Unity编辑器中何时需要协程 当我们定制Unity编辑器的时候,往往需要启动额外的协程或者线程进行处理.比如当执行一些界面更新的时候,需要大量计算,如果用户在不断修正一个参数,比如从1变化到2,这种变化过程要经历无数中间步骤,调用N多次Update,如果直接在Update中不断刷新,界面很容易直接卡死.所以在一个协程中进行一些优化,只保留用户最后一次参数修正,省去中间步骤,就会好很多.这属于Unity编辑器的内容,也属于优化的内容,还是放在优化中吧. 解决问题思路 Unity官网的questi

Unity中使用协程实现倒计时功能

unity中协程的功能很强大,能够充分发挥unity协程功能的地方就是游戏的倒计时,今天我们就来实现一个简易版本的倒计时. 新建一个场景,给camera添加一个脚本,脚本内容如下: using UnityEngine; using System.Collections; public class ShowNumber : MonoBehaviour { private int tmp = 10; // Use this for initialization void Start () { //开

python中的协程(协同程序)

协程:将函数编写为一个能处理输入参数的任务 使用yield语句并以表达式yield的形式创建协程 #匹配器案例: def print_info(data):    print('Looking for',data);    while True:      line = (yield)      if data in line:        print(line); 上面这个函数 就是一个协程程序 要使用这个函数 首先需用调用它 并且 向前执行到第一条yield语句 info = print_