Coroutines
Coroutines are the recommended way to write asynchronous code in Tornado. Coroutines use the Python yield
keyword to suspend and resume execution instead of a chain of callbacks (cooperative lightweight threads as seen in frameworks like gevent are sometimes called coroutines as well, but in Tornado all coroutines use explicit context switches and are called as asynchronous functions).
Tornado 推荐使用协程. 协程使用 yield 关键字来挂起和重启执行, 而不是使用链式回调. 然后提到了 gevent, gevent 也实现了协程, 但是原理与 tornado 并不相同. 那么, 协程到底是啥? 看不懂维基百科的定义, 自己编一个, 我不知道协程具体是什么, 我知道协程可以有异步, 可以在一个线程下执行大量的并发, 并且每一个并发之间都是共享上下文的, 我也不用担心某个数据结构是否是线程安全的, 也不用关心数据同步不同步, 锁不锁的问题. 大体上, 协程跟这些有关.
gevent 使用 lightweight threads 来实现以上的功能, 轻量线程, 看来就是把系统级别的线程再细分, 当遇到异步和高并发的时候, 还是多线程那一套, 但是她自己能管理好, 不用我们管, 我们看到的还是一个单线程, 同步的写法, 但是性能像异步.
tornado 使用 context switches 上下文切换, 想象高并发的情景, 可能是在每一个并发之间去切换, 大体上是不是可以理解为由事件循环, 驱动上下文切换.
去年的时候, 甚至看了一片简单的事件循环demo的源码, 然后懵逼了, 我能理解事件循环, 但是IO是谁去做的, 做完了是谁来通知的? 肯定不是事件循环这个线程吧? 使用 yield 切换, 原理很麻烦, 我大体也能理解, 但是谁通知的 yield ? 找了很多文章, 脑容量有限, 觉得都没有说清楚.
Coroutines are almost as simple as synchronous code, but without the expense of a thread. They also make concurrency easier to reason about by reducing the number of places where a context switch can happen.
协程的代码几乎与同步代码一样简单, 但是没有线程的开销. 并且协程使并发变得更容易, 因为它减少了上下文切换的场景.
再温习一次协程的代码
from tornado import gen @gen.coroutine def fetch_coroutine(url): http_client = AsyncHTTPClient() response = yield http_client.fetch(url) # In Python versions prior to 3.3, returning a value from # a generator is not allowed and you must use # raise gen.Return(response.body) # instead. return response.body
略过了 python 3.5 的 async, await, 暂时还用不到, 再者心里有阴影.
How it works
A function containing yield
is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The @gen.coroutine
decorator communicates with the generator via the yield
expressions, and with the coroutine’s caller by returning a Future
.
一个包含yield的函数就成了生成器. 所有的生成器都是异步的, 当生成器被调用时, 它返回一个生成器对象, 而不是执行到结束. @gen.coroutine 装饰器通过yield 与生成器进行交互, 并返回调用者一个Future
现在能猜测, 在背后做工作的人是 @gen.coroutine, 当IO结束的时候, 是它来通过 yield 来通知, 不过对于 context switch, 还是没有一个直观的感觉.
生成器内部循环的简化示例
# Simplified inner loop of tornado.gen.Runner def run(self): # send(x) makes the current yield return x. # It returns when the next yield is reached future = self.gen.send(self.next) def callback(f): self.next = f.result() self.run() future.add_done_callback(callback)
The decorator receives a Future
from the generator, waits (without blocking) for that Future
to complete, then “unwraps” the Future
and sends the result back into the generator as the result of the yield
expression. Most asynchronous code never touches the Future
class directly except to immediately pass the Future
returned by an asynchronous function to a yield
expression.
装饰器从生成器收到一个 Future, 无阻塞的等待 Future 执行结束, 然后拆解 Future 并将执行结果作为 yield 表达式的结果送回生成器. 绝大部分的异步代码不会直接操作 Future 类, 除非将异步函数返回的 Future 对象直接传递给 yield 表达式.
现在回想起来, 当时在这里就走偏了, 就想搞清楚上面的代码是什么意思. 然后看源码, 找博客, 反倒忘了最初的目的仅仅只是使用 tornado 写服务而已.
How to call a coroutine
实际上这里并没有对 coroutine 的原理进行涉及, 只是心平气和的告诉你, 我这有个这样的方法, 你该怎么用.
Coroutines do not raise exceptions in the normal way: any exception they raise will be trapped in theFuture
until it is yielded. This means it is important to call coroutines in the right way, or you may have errors that go unnoticed:
这里是说 coroutine 应该编写以正确地生成异常, 因为异常会被包裹在 Future 对象中, 直到被 yielded. 换句话说, 如果有异常, 不被 yielded 是出不来的.
@gen.coroutine def divide(x, y): return x / y def bad_call(): # This should raise a ZeroDivisionError, but it won‘t because # the coroutine is called incorrectly. divide(1, 0)
In nearly all cases, any function that calls a coroutine must be a coroutine itself, and use the yield
keyword in the call. When you are overriding a method defined in a superclass, consult the documentation to see if coroutines are allowed (the documentation should say that the method “may be a coroutine” or “may return a Future
”):
在几乎所有情景下, 任何一个调用 coroutine 的函数都必是一个 coroutine, 并且是使用 yield 来调用. 如上文所说, yield 会拆解 Future 对象, 并返回 result.
当你重载超类中定义的方法是, 参考一下文档是否允许这个方法被重载成 coroutine, 文档大概会说, 这个方法可以是一个 coroutine, 或可以是一个 Future 对象
@gen.coroutine def good_call(): # yield will unwrap the Future returned by divide() and raise # the exception. yield divide(1, 0)
Sometimes you may want to “fire and forget” a coroutine without waiting for its result. In this case it is recommended to use IOLoop.spawn_callback
, which makes the IOLoop
responsible for the call. If it fails, the IOLoop
will log a stack trace:
有时你可能需要直接触发一个 coroutine, 而不等待它的结果, 你需要 IOLoop.spawn_callback, 让 IOILoop 来调用它. 如果执行失败, IOLoop 会记录 stack trace 日志. 因为直接调用 coroutine 是无法获得你想要的结果的.
# The IOLoop will catch the exception and print a stack trace in # the logs. Note that this doesn‘t look like a normal call, since # we pass the function object to be called by the IOLoop. IOLoop.current().spawn_callback(divide, 1, 0)
Finally, at the top level of a program, if the `.IOLoop` is not yet running, you can start the IOLoop
, run the coroutine, and then stop the IOLoop
with the IOLoop.run_sync
method. This is often used to start the main
function of a batch-oriented program:
最终, 在程序的最顶层, 如果 `.IOLoop` 还没有运行, 你可以使用 IOLoop 运行coroutine, 使用 IOLoop.run_sync 方法停止 IOLoop. 这个经常用来运行 main 函数. 简单说, 如果想立即执行一个 coroutine, 使用 IOLoop.spawn_callback, 想执行一系列 coroutine 构成的程序, 使用 IOLoop.run_sync.
# run_sync() doesn‘t take arguments, so we must wrap the # call in a lambda. IOLoop.current().run_sync(lambda: divide(1, 0))
Coroutine patterns
Interaction with callbacks
To interact with asynchronous code that uses callbacks instead of Future
, wrap the call in a Task
. This will add the callback argument for you and return a Future
which you can yield:
coroutine 与使用 callback 的异步函数交互时, 使用 Task 来调用函数. 后面这句, add the callback argument 是解释原理, 还不如不解释, 直接说 return a Future 更简单明了.
@gen.coroutine def call_task(): # Note that there are no parens on some_function. # This will be translated by Task into # some_function(other_args, callback=callback) yield gen.Task(some_function, other_args)
Calling blocking functions
The simplest way to call a blocking function from a coroutine is to use a ThreadPoolExecutor
, which returns Futures
that are compatible with coroutines:
调用阻塞函数时, 最简单的方法是使用 ThreadPoolExecutor, 不管是 IO bouding 还是 CPU bouding, tornado 是事件循环单线程, 一旦有 block, 对性能影响很大. ThreadPoolExecutor 会返回一个 Future对象
thread_pool = ThreadPoolExecutor(4) @gen.coroutine def call_blocking(): yield thread_pool.submit(blocking_func, args)
Parallelism
The coroutine decorator recognizes lists and dicts whose values are Futures
, and waits for all of those Futures
in parallel:
coroutine 装饰器会识别 lists 和 dicts 的 values 是否是Futures, 并且会等待全部 Future 对象并行执行完毕.
@gen.coroutine def parallel_fetch(url1, url2): resp1, resp2 = yield [http_client.fetch(url1), http_client.fetch(url2)] @gen.coroutine def parallel_fetch_many(urls): responses = yield [http_client.fetch(url) for url in urls] # responses is a list of HTTPResponses in the same order @gen.coroutine def parallel_fetch_dict(urls): responses = yield {url: http_client.fetch(url) for url in urls} # responses is a dict {url: HTTPResponse}
Interleaving
Sometimes it is useful to save a Future
instead of yielding it immediately, so you can start another operation before waiting:
有时候, 需要保存一个 Future 而不是立即 yield 它, 所以可以直接开始另一个操作
@gen.coroutine def get(self): fetch_future = self.fetch_next_chunk() while True: chunk = yield fetch_future if chunk is None: break self.write(chunk) fetch_future = self.fetch_next_chunk() yield self.flush()
Looping
Looping is tricky with coroutines since there is no way in Python to yield
on every iteration of a for
or while
loop and capture the result of the yield. Instead, you’ll need to separate the loop condition from accessing the results, as in this example from Motor:
Python 中没有方法在 for 或 while 的每次循环中执行 yield, 并获取 yield 的结果. 想要达成目标, 需要将循环条件从获取结果中分离出来. 完全不理解这话什么意思. 是不是这个意思, list 和 dict, for i in list, while i in list, 由此驱动循环, 而给你一个 coroutine, 怎么循环其至执行结束, 那就蹉跎了. 例子中是将 yield cursor.fetch_next 作为循环条件, 而获取数据, 则在 doc = cursor.next_object() 中进行.
import motor db = motor.MotorClient().test @gen.coroutine def loop_example(collection): cursor = db.collection.find() while (yield cursor.fetch_next): doc = cursor.next_object()
Running in the background
PeriodicCallback
is not normally used with coroutines. Instead, a coroutine can contain a while True:
loop and use tornado.gen.sleep
:
想周期性的运行 coroutine, 比如间隔 60s 运行, 需要使用 while True 循环配合 tornado.gen.sleep.
Sometimes a more complicated loop may be desirable. For example, the previous loop runs every60+N
seconds, where N
is the running time of do_something()
. To run exactly every 60 seconds, use the interleaving pattern from above:
minute_loop, 两个 do_something 之间的间隔是 60s 加上 do_something 运行的时间
minute_loop2, 两个 do_something 之间的检测是 60s, 当然前提是, do_something 的运行时间是小于 60s 的. 如果 do_something 的运行时间大于 60s, 也就做不到每60s运行一次, 属于命题错误. 那么, 可以推断两个 yield 之间是同步顺序执行的. 如果没有 @gen.coroutine 装饰, yield 之间就是异步了.
@gen.coroutine def minute_loop(): while True: yield do_something() yield gen.sleep(60) # Coroutines that loop forever are generally started with # spawn_callback(). IOLoop.current().spawn_callback(minute_loop) @gen.coroutine def minute_loop2(): while True: nxt = gen.sleep(60) # Start the clock. yield do_something() # Run while the clock is ticking. yield nxt # Wait for the timer to run out.