在进行gevent源码学习一分析之后,我还对两个比较核心的问题抱有疑问:
1. gevent.Greenlet.join()以及他的list版本joinall()的原理和使用。
2. 关于在使用monkey_patchall()之后隐式切换的问题。
下面我将继续通过分析源码及其行为来加以理解和掌握。
1. 关于gevent.Greenlet.join()(以下简称join)先来看一个例子:
import gevent def xixihaha(msg): print(msg) gevent.sleep(0) print msg g1 = gevent.spawn(xixihaha, ‘xixi‘) gevent.sleep(0)
先分析一波
1. 初始化一个Greenlet实例g1,将该Greenlet.switch注册到hub上。
2. 然后调用gevent.sleep(0)将当前greenlet保存下来放在Waiter()中并向hub注册该回调。这里再贴一次实现的代码跟着走一遍强调一下实现,这对一会儿理解join实现非常有帮助:
hub = get_hub() loop = hub.loop if seconds <= 0: waiter = Waiter() loop.run_callback(waiter.switch) waiter.get() else: hub.wait(loop.timer(seconds, ref=ref))
这里我们将second设置为0,所以走第一层判断,初始化一个Waiter()对象给waiter,随后注册waiter.switch方法到hub,调用waiter.get()去调用hub的switch()方法(注意这里的self.hub.switch()方法并没有切换这个概念。这只是Hub类中自己实现的一个switch方法而已):
def get(self): """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: if self.greenlet is not None: raise ConcurrentObjectUseError(‘This Waiter is already used by %r‘ % (self.greenlet, )) self.greenlet = getcurrent() try: return self.hub.switch() finally: self.greenlet = None
然后hub.switch(self)会返回一个greenlet.switch(self)这里才是切换 然后他会调用自己的run方法(greenlet底层实现)。
def switch(self): switch_out = getattr(getcurrent(), ‘switch_out‘, None) if switch_out is not None: switch_out() return greenlet.switch(self)
def run(self): """ Entry-point to running the loop. This method is called automatically when the hub greenlet is scheduled; do not call it directly. :raises LoopExit: If the loop finishes running. This means that there are no other scheduled greenlets, and no active watchers or servers. In some situations, this indicates a programming error. """ assert self is getcurrent(), ‘Do not call Hub.run() directly‘ while True: loop = self.loop loop.error_handler = self try: loop.run() finally: loop.error_handler = None # break the refcount cycle self.parent.throw(LoopExit(‘This operation would block forever‘, self)) # this function must never return, as it will cause switch() in the parent greenlet # to return an unexpected value # It is still possible to kill this greenlet with throw. However, in that case # switching to it is no longer safe, as switch will return immediatelly
这里开始就进入事件loop循环了,run()会调用到注册过来的回调。这里开始g1注册过来的回调就会被调用了。
之后的流程就是运行g1注册的回调,然后运行Waiter()注册的回调,然后回到外层最后结束掉。打印结果:
xixi
那还有一个msg没有打印呢!怎么就退出来了!!这不科学。所以这就是join可以办到的事情了,来看源码:
def join(self, timeout=None): """Wait until the greenlet finishes or *timeout* expires. Return ``None`` regardless. """ if self.ready(): return switch = getcurrent().switch self.rawlink(switch) try: t = Timeout._start_new_or_dummy(timeout) try: result = self.parent.switch() if result is not self: raise InvalidSwitchError(‘Invalid switch into Greenlet.join(): %r‘ % (result, )) finally: t.cancel() except Timeout as ex: self.unlink(switch) if ex is not t: raise except: self.unlink(switch) raise
将当前的greenlet.switch方法赋值给switch然后调用rawlink方法:
def rawlink(self, callback): """Register a callable to be executed when the greenlet finishes execution. The *callback* will be called with this instance as an argument. .. caution:: The callable will be called in the HUB greenlet. """ if not callable(callback): raise TypeError(‘Expected callable: %r‘ % (callback, )) self._links.append(callback) if self.ready() and self._links and not self._notifier: self._notifier = self.parent.loop.run_callback(self._notify_links)
rawlink其实也没做什么,他将当前greenlet的switch存进了一个双端链表中,就是self._links.append(callback)这一句。保存了起来并没有像sleep那样像hub上注册回调,所以hub在回调链里是没有这家伙的。
然后还是跟上面一样的流程用self.parent.switch()回到hub中调用greenlet.switch(self)运行run函数进入loop循环。然后运行第一个注册进来的回调也就是运行xixihaha并打印第一个msg。这个时候调用gevent.sleep注册一个Waiter()事件到hub,然后依然会回来。然后再执行最后一个msg 因为整个回调链上只有他自己只能又切回来。当运行完之后我们来看下如何回到最外面main:
def run(self): try: self.__cancel_start() self._start_event = _start_completed_event try: result = self._run(*self.args, **self.kwargs) except: self._report_error(sys.exc_info()) return self._report_result(result) finally: self.__dict__.pop(‘_run‘, None) self.__dict__.pop(‘args‘, None) self.__dict__.pop(‘kwargs‘, None)
当xixihaha回调也结束之后也就是第二个msg也运行完了之后会返回调用他的那个Greenlet.run方法继续向下执行,然后会执行到self._report_result(result)
def _report_result(self, result): self._exc_info = (None, None, None) self.value = result if self._has_links() and not self._notifier: self._notifier = self.parent.loop.run_callback(self._notify_links)
这里我们直接看判断这里,会去hub上注册self._notify_links。 self._notify_links方法是什么来看:
def _notify_links(self): while self._links: link = self._links.popleft() try: link(self) except: self.parent.handle_error((link, self), *sys.exc_info())
self._links.popleft()会让你把前面赋值给self._links的回调吐出来赋值给link然后运行这个回调。结果当然是愉快的回到了main里面。
然后弹出乱七八糟的东西最后结束run。
总结:
可以看到join和joinall()类似的都是没有把自己注册到hub主循环之中,而是等所有的greenlet都运行完了之后,再调用自己回到原来注册过去的greenlet中,就不会在因为提前切到主函数main中导致整个过程提前结束。
2. 关于在使用monkey_patchall()之后隐式切换的问题:
这个我不准备再拿特别大篇幅来分析讲解了,大致说下我的理解,首先必须要知道一点就是gevent的底层是libev,这也是为什么他如此高效的原因。hub里面的loop下就封装了各种各样对应的libev事件。就拿gevent.sleep()来举例子,里面使用的self.parent.loop.timer就是注册timer事件,而网络请求的切换只是将时间到期事件变成了io读写事件。每当io读取事件,写事件发生的时候,就会触发对应的事件gevent就是通过这些事件的触发来决定自身什么时候该切换到哪里进行哪些事件的处理。理解了这个,也就明白了隐式切换真正的实现原理。然后再去看源码可能就没有那么一脸萌比的感觉了。
对gevent的源码分析到这里,目前也足够我使用了。下一篇关于gevent的文章将分析和实践一些高级应用和特性,毕竟我们学库都是拿来使用的。