python 协程库gevent学习--gevent源码学习(二)

在进行gevent源码学习一分析之后,我还对两个比较核心的问题抱有疑问:

  1. gevent.Greenlet.join()以及他的list版本joinall()的原理和使用。

  2. 关于在使用monkey_patchall()之后隐式切换的问题。

下面我将继续通过分析源码及其行为来加以理解和掌握。

1. 关于gevent.Greenlet.join()(以下简称join)先来看一个例子:

import gevent

def xixihaha(msg):
    print(msg)
    gevent.sleep(0)
    print msg

g1 = gevent.spawn(xixihaha, ‘xixi‘)
gevent.sleep(0)

先分析一波

1. 初始化一个Greenlet实例g1,将该Greenlet.switch注册到hub上。

2. 然后调用gevent.sleep(0)将当前greenlet保存下来放在Waiter()中并向hub注册该回调。这里再贴一次实现的代码跟着走一遍强调一下实现,这对一会儿理解join实现非常有帮助:

    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

这里我们将second设置为0,所以走第一层判断,初始化一个Waiter()对象给waiter,随后注册waiter.switch方法到hub,调用waiter.get()去调用hub的switch()方法(注意这里的self.hub.switch()方法并没有切换这个概念。这只是Hub类中自己实现的一个switch方法而已):

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError(‘This Waiter is already used by %r‘ % (self.greenlet, ))
            self.greenlet = getcurrent()
            try:
                return self.hub.switch()
            finally:
                self.greenlet = None

然后hub.switch(self)会返回一个greenlet.switch(self)这里才是切换 然后他会调用自己的run方法(greenlet底层实现)。

    def switch(self):
        switch_out = getattr(getcurrent(), ‘switch_out‘, None)
        if switch_out is not None:
            switch_out()
        return greenlet.switch(self)
    def run(self):
        """
        Entry-point to running the loop. This method is called automatically
        when the hub greenlet is scheduled; do not call it directly.

        :raises LoopExit: If the loop finishes running. This means
           that there are no other scheduled greenlets, and no active
           watchers or servers. In some situations, this indicates a
           programming error.
        """
        assert self is getcurrent(), ‘Do not call Hub.run() directly‘
        while True:
            loop = self.loop
            loop.error_handler = self
            try:
                loop.run()
            finally:
                loop.error_handler = None  # break the refcount cycle
            self.parent.throw(LoopExit(‘This operation would block forever‘, self))
        # this function must never return, as it will cause switch() in the parent greenlet
        # to return an unexpected value
        # It is still possible to kill this greenlet with throw. However, in that case
        # switching to it is no longer safe, as switch will return immediatelly

这里开始就进入事件loop循环了,run()会调用到注册过来的回调。这里开始g1注册过来的回调就会被调用了。

之后的流程就是运行g1注册的回调,然后运行Waiter()注册的回调,然后回到外层最后结束掉。打印结果:

xixi

那还有一个msg没有打印呢!怎么就退出来了!!这不科学。所以这就是join可以办到的事情了,来看源码:

    def join(self, timeout=None):
        """Wait until the greenlet finishes or *timeout* expires.
        Return ``None`` regardless.
        """
        if self.ready():
            return

        switch = getcurrent().switch
        self.rawlink(switch)
        try:
            t = Timeout._start_new_or_dummy(timeout)
            try:
                result = self.parent.switch()
                if result is not self:
                    raise InvalidSwitchError(‘Invalid switch into Greenlet.join(): %r‘ % (result, ))
            finally:
                t.cancel()
        except Timeout as ex:
            self.unlink(switch)
            if ex is not t:
                raise
        except:
            self.unlink(switch)
            raise

将当前的greenlet.switch方法赋值给switch然后调用rawlink方法:

    def rawlink(self, callback):
        """Register a callable to be executed when the greenlet finishes execution.

        The *callback* will be called with this instance as an argument.

        .. caution:: The callable will be called in the HUB greenlet.
        """
        if not callable(callback):
            raise TypeError(‘Expected callable: %r‘ % (callback, ))
        self._links.append(callback)
        if self.ready() and self._links and not self._notifier:
            self._notifier = self.parent.loop.run_callback(self._notify_links)

rawlink其实也没做什么,他将当前greenlet的switch存进了一个双端链表中,就是self._links.append(callback)这一句。保存了起来并没有像sleep那样像hub上注册回调,所以hub在回调链里是没有这家伙的。

然后还是跟上面一样的流程用self.parent.switch()回到hub中调用greenlet.switch(self)运行run函数进入loop循环。然后运行第一个注册进来的回调也就是运行xixihaha并打印第一个msg。这个时候调用gevent.sleep注册一个Waiter()事件到hub,然后依然会回来。然后再执行最后一个msg 因为整个回调链上只有他自己只能又切回来。当运行完之后我们来看下如何回到最外面main:

    def run(self):
        try:
            self.__cancel_start()
            self._start_event = _start_completed_event

            try:
                result = self._run(*self.args, **self.kwargs)
            except:
                self._report_error(sys.exc_info())
                return
            self._report_result(result)
        finally:
            self.__dict__.pop(‘_run‘, None)
            self.__dict__.pop(‘args‘, None)
            self.__dict__.pop(‘kwargs‘, None)

当xixihaha回调也结束之后也就是第二个msg也运行完了之后会返回调用他的那个Greenlet.run方法继续向下执行,然后会执行到self._report_result(result)

    def _report_result(self, result):
        self._exc_info = (None, None, None)
        self.value = result
        if self._has_links() and not self._notifier:
            self._notifier = self.parent.loop.run_callback(self._notify_links)

这里我们直接看判断这里,会去hub上注册self._notify_links。 self._notify_links方法是什么来看:

    def _notify_links(self):
        while self._links:
            link = self._links.popleft()
            try:
                link(self)
            except:
                self.parent.handle_error((link, self), *sys.exc_info())

self._links.popleft()会让你把前面赋值给self._links的回调吐出来赋值给link然后运行这个回调。结果当然是愉快的回到了main里面。

然后弹出乱七八糟的东西最后结束run。

总结:

可以看到join和joinall()类似的都是没有把自己注册到hub主循环之中,而是等所有的greenlet都运行完了之后,再调用自己回到原来注册过去的greenlet中,就不会在因为提前切到主函数main中导致整个过程提前结束。

2. 关于在使用monkey_patchall()之后隐式切换的问题:

这个我不准备再拿特别大篇幅来分析讲解了,大致说下我的理解,首先必须要知道一点就是gevent的底层是libev,这也是为什么他如此高效的原因。hub里面的loop下就封装了各种各样对应的libev事件。就拿gevent.sleep()来举例子,里面使用的self.parent.loop.timer就是注册timer事件,而网络请求的切换只是将时间到期事件变成了io读写事件。每当io读取事件,写事件发生的时候,就会触发对应的事件gevent就是通过这些事件的触发来决定自身什么时候该切换到哪里进行哪些事件的处理。理解了这个,也就明白了隐式切换真正的实现原理。然后再去看源码可能就没有那么一脸萌比的感觉了。

对gevent的源码分析到这里,目前也足够我使用了。下一篇关于gevent的文章将分析和实践一些高级应用和特性,毕竟我们学库都是拿来使用的。

时间: 2024-10-19 02:12:18

python 协程库gevent学习--gevent源码学习(二)的相关文章

python协程的实现(greenlet源码分析)

基本上读完了greenlet的源代码,代码不多,就2000行C语言的代码,其中有一部分栈寄存器的修改的代码是由汇编实现的... 一句话来说明greenlet的实现原理:通过栈的复制切换来实现不同协程之间的切换... 那么接下里来具体的来看看greenlet的代码到底是怎么实现的... 好了,先来看看greenlet对象对应的C语言结构体: /** States: stack_stop == NULL && stack_start == NULL: did not start yet sta

python 协程库gevent学习--gevent数据结构及实战(三)

gevent学习系列第三章,前面两章分析了大量常用几个函数的源码以及实现原理.这一章重点偏向实战了,按照官方给出的gevent学习指南,我将依次分析官方给出的7个数据结构.以及给出几个相应使用他们的例子. 1.事件: 事件是一个可以让我们在Greenlet之间异步通信的形式贴上一个gevent指南上面的例子: import gevent from gevent.event import Event ''' Illustrates the use of events ''' evt = Event

python协程io自动切换--gevent

1.gevent执行 import gevent def func1(): print('func1 start') gevent.sleep(2) print('func1 end') def func2(): print('func2 start') gevent.sleep(1) print('func2 end') def func3(): print('func3 start') gevent.sleep(0) print('func3 end') if __name__=='__ma

Python:线程、进程与协程(3)——Queue模块及源码分析

Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式.该模块提供了三种队列: Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列 Queue.LifoQueue(maxsize):后进先出,相当于栈 Queue.PriorityQueue(maxsize):优先级队列. 其中LifoQueue,PriorityQueue是Queue的子类.三者拥有以下共同的方法: qsize():返回近似的队列大小.为什么要加"近似&q

菜鸟学习Fabric源码学习 — 背书节点和链码容器交互

Fabric 1.4 源码分析 背书节点和链码容器交互 本文档主要介绍背书节点和链码容器交互流程,在Endorser背书节点章节中,无论是deploy.upgrade或者调用链码,最后都会调用ChaincodeSupport.LaunchInit()/Launch()以及ChaincodeSupport.execute()方法.其中Launch()方法启动链码容器,execute()方法调用链码. 1. 准备 ChaincodeSupport.Launch()首先进行判断,根据peer侧该版本链

学习JDK源码(二):Integer

最近没有好好保持学习的好习惯,该打. 天天忙,感觉都不知道在干嘛.真的厌倦了普通的Java代码,还是想学点新技术. 用了这么久的Java,最常用的数据类型肯定是Int了,而他的包装类Integer用的其实也不少.但是问问我们自己,当我们创建一个数字对象时,你们是直接new int x =1的多吧,或者说用new Integer(1),其实这样写的已经就很少了,我就没怎么这么写过.直到最近,我才知道一个能提升性能的方法Integer.valueOf(1),他可以使用系统缓存,既能减少可能的内存占用

前端学习-jQuery源码学习

jQuery 2 的版本与jQuery 1的版本相比,没有再考虑IE6,7,8的兼容问题,因此使用时如果不用考虑IE6,7,8就用jQuery 2的版本,如果需要考虑IE6,7,8就使用jQuery 1的版本. jQuery对象是一个以DOM为对象的特殊数组,并包含大量方法,简单可以理解为: function jQuery(){     var jquery = [dom1, dom2, dom3];     jquery.fn1 = function(){-};     jquery.fn2

Weka学习 -- StringToWordVector 源码学习(1)

代码整个执行流程 参数设置 input数据,设置数据格式 batchFinished(),处理数据(Tokenzier,Stemming,Stopwords) determineDictionary();  统计计算(TF,IDF) 归一化 output 一些变量和方法的作用 m_Dictionary , m_DocsCounts  变量与 m_OutputCounts变量 意义 public TreeMap m_Dictionary = new TreeMap(); //TreeMap类型成员

Android FM模块学习之四源码学习(2)

前几章我们分析了FM模块的几个主要的类文件,今天要分析的是:FMTransceiver.java   // 某些工程中名称为FMRadioService.java public class FmTransceiver { /* Primary FM States : * FM will be in one of the 4 states at any point of time * '0' - FMState_Turned_Off * '1' - FMState_Rx_Turned_On * '