Gevent源码之loop的实现

gevent之所以性能好,最主要就得益于对libev的封装,这里就来看看这部分具体的实现。。。

稍微看一下libev的用法就知道,libev将各种事件都定义为了watcher,这里包括了定时,io等等。。

在gevent主要就是对libev的loop以及watcher进行了封装。。这部分采用的是cython来写的。。

通过以前看的gevent的代码可以知道,所有建立的协程都有一个共同的parent协程,也就是hub协程,他有一个loop对象,其实就可以理解为gevnet通过hub协程来管理整个loop的运行。。。

loop是采用cython来编写的,代码在core.pyx里面。。。

先来看看它的一些属性的定义:

    cdef libev.ev_loop* _ptr    #libev的loop的引用
    cdef public object error_handler
    cdef libev.ev_prepare _prepare  #这个prepare事件,每次在loop之前,都会调用它的回调
    cdef public list _callbacks   #当前loop对象上面挂起的所有回调的链表
    cdef libev.ev_timer _timer0   #一个超时为0的timer,用于让loop立即返回

对于这几个属性,上面的注释应该很清楚了。。。嗯,这里要感叹一下,用cython来写python的扩展还真的很方便。。。

好了,这里来看看构造函数吧:

    #构造函数
    def __init__(self, object flags=None, object default=None, size_t ptr=0):
        cdef unsigned int c_flags
        cdef object old_handler = None
        #这个是注册每次event loop 之前的回调,每次loop开始之前,就会执行这里
        #其实最终调用的时_run_callbacks方法
        libev.ev_prepare_init(&self._prepare, <void*>gevent_run_callbacks)
#ifdef _WIN32
        libev.ev_timer_init(&self._periodic_signal_checker, <void*>gevent_periodic_signal_check, 0.3, 0.3)
#endif
        #注册这个timer的回调,这个回调其实什么都不做,主要的目的就是让event loop的循环立即退出,去处理回调
        libev.ev_timer_init(&self._timer0, <void*>gevent_noop, 0.0, 0.0)
        if ptr:
            self._ptr = <libev.ev_loop*>ptr
        else:
            c_flags = _flags_to_int(flags)
            _check_flags(c_flags)
            c_flags |= libev.EVFLAG_NOENV
            if default is None:
                default = True
                if _default_loop_destroyed:
                    default = False
            if default:
                self._ptr = libev.gevent_ev_default_loop(c_flags)
                if not self._ptr:
                    raise SystemError("ev_default_loop(%s) failed" % (c_flags, ))
#ifdef _WIN32
                libev.ev_timer_start(self._ptr, &self._periodic_signal_checker)
                libev.ev_unref(self._ptr)
#endif
            else:  #创建loop的引用,一般情况下都是在这里创建的
                self._ptr = libev.ev_loop_new(c_flags)
                if not self._ptr:
                    raise SystemError("ev_loop_new(%s) failed" % (c_flags, ))
            if default or __SYSERR_CALLBACK is None:
                set_syserr_cb(self._handle_syserr)
            #在loop上面启动prepare,这样可以保证每次loop之前都执行了prepare的回调,也就是执行在loop上面注册的callback
            libev.ev_prepare_start(self._ptr, &self._prepare)
            libev.ev_unref(self._ptr)
        self._callbacks = []  #初始化回调链表

这个还是很简单的,关于libev方面的东西这里就不细说了,代码首先初始化了prepare事件,并将其的回调设置为了gevent_run_callbacks方法,这个方法其实最终又是调用的当前loop对象的_run_callbacks方法来处理所有在当前loop上面挂起的回调。。。这样也就是说,每一次在运行loop的wait之前,都会先处理在当前loop上面挂起的回调。。这里就包括很多协程的switch啥的。。。

另外这里可以看到初始化了一个超时为0的timer,这个事干嘛用的呢,嗯,例如我们在运行当前loop上面挂起的所有的回调之后,这些回调中有可能又在当前loop上面挂起回调,为了让eventloop可以理解返回,接下来处理这些挂起的回调,那么就在当前eventloop上面挂起一个超时为0的timer,可以让loop立刻返回。

接下来就是构建libev的loop的过程,最后可以看到调用了ev_prepare_start方法来启动了prepare事件。。。

嗯,接下来看看loop对象是怎么处理挂在当前loop上的回调的,也就是_run_callbacks方法:

    #在gevent_run_callbacks中其实也是调用这个方法来具体的运行所有的回调
    #在每一次运行loop之前,都会执行这些回调
    cdef _run_callbacks(self):
        cdef callback cb
        cdef object callbacks
        cdef int count = 1000
        libev.ev_timer_stop(self._ptr, &self._timer0)
        while self._callbacks and count > 0:
            callbacks = self._callbacks
            self._callbacks = []
            for cb in callbacks:
                libev.ev_unref(self._ptr)
                gevent_call(self, cb)
                count -= 1
        #在运行回调的时候有可能又加入了回调,所以这里进行定时
        #因为这个定时的超时是0,所以让loop立即返回,可以处理回调
        if self._callbacks:
            libev.ev_timer_start(self._ptr, &self._timer0)

其实代码比较简单吧,无非就是遍历回调的队列,然后执行,然后判断是否加入了新的回调,如果有的话,那么就启动那个超时为0的timer,让loop可以立即返回接着处理回调。。。

接下来再来看看loop的run方法吧。。。在hub协程中每次也就是调用这个方法来处理的:

    #运行loop对象
    def run(self, nowait=False, once=False):
        CHECK_LOOP2(self)
        cdef unsigned int flags = 0
        if nowait:
            flags |= libev.EVRUN_NOWAIT
        if once:
            flags |= libev.EVRUN_ONCE
        with nogil:
            #这里主要就是运行libev的loop
            libev.ev_run(self._ptr, flags)

这里代码也算是简单吧,其实就是运行libev的事件循环。。。。

好了。。。其实到这里loop的基本上的东西 就说的差不多了。。。。

那么接下来来看看loop上面是如何封装事件的吧。。。这里就拿IO事件来举例子了。。。。

对于I/O watcher的创建,一般情况下是在loop上面调用io方法:

#创建I/O watcher
#ifdef _WIN32
    def io(self, libev.vfd_socket_t fd, int events, ref=True, priority=None):
        return io(self, fd, events, ref, priority)
#else
    def io(self, int fd, int events, ref=True, priority=None):
        return io(self, fd, events, ref, priority)
#endif

这里根据系统的不同分成两种,不过我们只需要关心下面一种就好了,windows的就暂时不考虑了。。。这里其实就是构建了一个I/O类型的watcher,构造函数,第一个是当前loop的引用,第二个是文件描述符,第三个是感兴趣的事件(读或者写)。。。

这里来看看IO watcher的源码定义吧:

#I/Owatcher的定义
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:

    WATCHER_BASE(io)  #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等

    #这个其实其实就是启动watcher
    def start(self, object callback, *args, pass_events=False):
        CHECK_LOOP2(self.loop)
        if callback is None:
            raise TypeError('callback must be callable, not None')
        self.callback = callback
        if pass_events:
            self.args = (GEVENT_CORE_EVENTS, ) + args
        else:
            self.args = args
        LIBEV_UNREF
        libev.ev_io_start(self.loop._ptr, &self._watcher)  #在libev的loop上面启动这个io watcher
        PYTHON_INCREF

    ACTIVE

    PENDING

#ifdef _WIN32

    #io watcher的构造
    def __init__(self, loop loop, libev.vfd_socket_t fd, int events, ref=True, priority=None):
        if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE):
            raise ValueError('illegal event mask: %r' % events)
        cdef int vfd = libev.vfd_open(fd)
        libev.vfd_free(self._watcher.fd)
        #初始化,并设置回调为gevent_callback_io
        #在回调里面,会执行最开始watcher注册的回调,并会取消读写事件的注册
        libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, events)
        self.loop = loop
        if ref:
            self._flags = 0
        else:
            self._flags = 4
        if priority is not None:
            libev.ev_set_priority(&self._watcher, priority)

#else
    #一般就考虑这里的构造就好了,三个重要的参数分别是loop对象的引用,文件描述符,事件类型,一般情况下都不会带有优先级的
    def __init__(self, loop loop, int fd, int events, ref=True, priority=None):
        if fd < 0:
            raise ValueError('fd must be non-negative: %r' % fd)
        if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE):
            raise ValueError('illegal event mask: %r' % events)
        #调用libev的方法来初始化I/O watcher
        libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events)
        self.loop = loop
        if ref:
            self._flags = 0
        else:
            self._flags = 4
        if priority is not None:
            libev.ev_set_priority(&self._watcher, priority)

#endif

    property fd:

        def __get__(self):
            return libev.vfd_get(self._watcher.fd)

        def __set__(self, long fd):
            if libev.ev_is_active(&self._watcher):
                raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active")
            cdef int vfd = libev.vfd_open(fd)
            libev.vfd_free(self._watcher.fd)
            libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, self._watcher.events)

    property events:

        def __get__(self):
            return self._watcher.events

        def __set__(self, int events):
            if libev.ev_is_active(&self._watcher):
                raise AttributeError("'io' watcher attribute 'events' is read-only while watcher is active")
            libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, self._watcher.fd, events)

    property events_str:

        def __get__(self):
            return _events_to_str(self._watcher.events)

    def _format(self):
        return ' fd=%s events=%s' % (self.fd, self.events_str)

这里要注意WATCHER_BASE(io)是一个宏定义,他定义了一些基本的属性,例如libev的watcher引用等。。。

从构造函数中可以看到调用了libev的ev_io_init方法来初始化内部的I/O watcher引用,并设置了其回调为gevent_callback_io方法,在start方法中则调用了libev的ev_io_start方法来启动这个watcher,并记录了回调的方法。。。这里如果对libev有基本了解的话,通过上述代码就已经了解了I/O watcher的运行原理。。。

不过这里感觉有必要来说一下gevent_callback_io这个回调函数,也就是我们感兴趣的I/O事件发生了之后,会调用这个方法来处理。。。。

它的定义在callbacks.h以及callbacks.c中。。。而且是通过宏定义的方式来定义了很多类似的方法,例如gevent_callback_timer啥的。。。关于这部分的内容就不细说了。。直接来看看究竟调用的时什么代码吧:

//首先获取wather对象,然后调用回调
#define DEFINE_CALLBACK(WATCHER_LC, WATCHER_TYPE)     static void gevent_callback_##WATCHER_LC(struct ev_loop *_loop, void *c_watcher, int revents) {                          struct PyGevent##WATCHER_TYPE##Object* watcher = GET_OBJECT(PyGevent##WATCHER_TYPE##Object, c_watcher, _watcher);            gevent_callback(watcher->loop, watcher->_callback, watcher->args, (PyObject*)watcher, c_watcher, revents);     }

首先获取对应的watcher对象,然后调用gevent_callback方法来调用在watcher上面注册的回调。。。

//对于所有的事件的回调,其实最终都是通过这个函数来处理的
static void gevent_callback(struct PyGeventLoopObject* loop, PyObject* callback, PyObject* args, PyObject* watcher, void *c_watcher, int revents) {
    GIL_DECLARE;
    PyObject *result, *py_events;
    long length;
    py_events = 0;
    GIL_ENSURE;
    Py_INCREF(loop);
    Py_INCREF(callback);
    Py_INCREF(args);
    Py_INCREF(watcher);
    gevent_check_signals(loop);
    if (args == Py_None) {
        args = __pyx_empty_tuple;
    }
    length = PyTuple_Size(args);
    if (length < 0) {
        gevent_handle_error(loop, watcher);
        goto end;
    }
    if (length > 0 && PyTuple_GET_ITEM(args, 0) == GEVENT_CORE_EVENTS) {
        py_events = PyInt_FromLong(revents);
        if (!py_events) {
            gevent_handle_error(loop, watcher);
            goto end;
        }
        PyTuple_SET_ITEM(args, 0, py_events);
    }
    else {
        py_events = NULL;
    }
    result = PyObject_Call(callback, args, NULL); //执行回调
    if (result) {
        Py_DECREF(result);
    }
    else {
        gevent_handle_error(loop, watcher);
        if (revents & (EV_READ|EV_WRITE)) { //这里可以看出,如果注册了读写事件,那么会取消,也就标示每一次读写,都需要重写注册事件
            /* io watcher: not stopping it may cause the failing callback to be called repeatedly */
            gevent_stop(watcher, loop);
            goto end;
        }
    }
    if (!ev_is_active(c_watcher)) {
        /* Watcher was stopped, maybe by libev. Let's call stop() to clean up
         * 'callback' and 'args' properties, do Py_DECREF() and ev_ref() if necessary.
         * BTW, we don't need to check for EV_ERROR, because libev stops the watcher in that case. */
        gevent_stop(watcher, loop);
    }
end:
    if (py_events) {
        Py_DECREF(py_events);
        PyTuple_SET_ITEM(args, 0, GEVENT_CORE_EVENTS);
    }
    Py_DECREF(watcher);
    Py_DECREF(args);
    Py_DECREF(callback);
    Py_DECREF(loop);
    GIL_RELEASE;
}

这里吧gevent_callback方法的代码列出来了。。。是因为有一部分需要注意,对于I/O事件类型,处理了回调之后,将会取消在loop上面挂起的I/O事件。。。

这样的话,对于每一次读写,都需要在loop上面注册。。。。嗯。。不过好处就是。。提供了同步的变成范式。。。

好了,到这里gevent的loop的实现就差不多了。。。

时间: 2024-11-14 12:02:11

Gevent源码之loop的实现的相关文章

python 协程库gevent学习--gevent源码学习(二)

在进行gevent源码学习一分析之后,我还对两个比较核心的问题抱有疑问: 1. gevent.Greenlet.join()以及他的list版本joinall()的原理和使用. 2. 关于在使用monkey_patchall()之后隐式切换的问题. 下面我将继续通过分析源码及其行为来加以理解和掌握. 1. 关于gevent.Greenlet.join()(以下简称join)先来看一个例子: import gevent def xixihaha(msg): print(msg) gevent.sl

转:[gevent源码分析] 深度分析gevent运行流程

[gevent源码分析] 深度分析gevent运行流程 http://blog.csdn.net/yueguanghaidao/article/details/24281751 一直对gevent运行流程比较模糊,最近看源码略有所得,不敢独享,故分享之. gevent是一个高性能网络库,底层是libevent,1.0版本之后是libev,核心是greenlet.gevent和eventlet是亲近,唯一不同的是eventlet是自己实现的事件驱动,而gevent是使用libev.两者都有广泛的应

[gevent源码分析] gevent两架马车-libev和greenlet

本篇将讨论gevent的两架马车-libev和greenlet如何协同工作的. gevent事件驱动底层使用了libev,我们先看看如何单独使用gevent中的事件循环. #coding=utf8 import socket import gevent from gevent.core import loop def f(): s, address = sock.accept() print address s.send("hello world\r\n") loop = loop()

[gevent源码分析] libev cython绑定core.pyx

gevent core就是封装了libev,使用了cython的语法,感兴趣童鞋可以好好研究研究.其实libev是有python的封装 pyev(https://pythonhosted.org/pyev/),不过pyev是使用C来写扩展的,代码巨复杂.在看core.pyx代码之前先学习一下 core.pyx用到的cython知识. 一: cython基础知识 1.cdef, def, cpdef的区别 cdef用于定义C中的函数,变量,如cdef int i;而def知识python中的函数定

[gevent源码分析] c-ares异步DNS请求

c-ares是异步DNS请求库,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent, 所以它的DNS请求也是使用c-ares,1.0版本后使用cython封装了c-ares. c-ares官方文档,http://c-ares.haxx.se/docs.html. gevent中DNS默认使用的是线程池版本的,可通过设置GEVENT_RESOLVER=ares环境变量使用c-ares异步库. 如何证明的确是异步呢,试着跑一遍你

Android之Handler源码深入解析

闲着没事,就来看看源码,看看源码的各种原理,会用只是简单的,知道为什么才是最牛逼的. Handler源码分析那,从使用的步骤来边用边分析: 1.创建一个Handler对象:new Handler(getMainLooper(),this); 这是我常用的一个方式,getMainLooper是获取主线程的Looper,this则是实现CallBack的接口 看一下Handler的构造函数 public Handler() { this(null, false); } public Handler(

Android消息机制源码分析

本篇主要介绍Android中的消息机制,即Looper.Handler是如何协同工作的: Looper:主要用来管理当前线程的消息队列,每个线程只能有一个Looper Handler:用来将消息(Message)插入到当前线程的消息队列,并负责分发Looper中的消息,将消息发送到当前线程执行 具体关系图如下所示: 接下来我们来分析一下Looper和Handler的源码,了解一下其中的奥妙. 首先我们从一个程序运行的入口来分析,源码如下: public static void main(Stri

Android之Handler源码深入分析

闲着没事,就来看看源码,看看源码的各种原理,会用只是简单的,知道为什么才是最牛逼的. Handler源码分析那,从使用的步骤来边用边分析: 1.创建一个Handler对象:new Handler(getMainLooper(),this); 这是我常用的一个方式,getMainLooper是获取主线程的Looper,this则是实现CallBack的接口 看一下Handler的构造函数 public Handler() { this(null, false); } public Handler(

Spark Streaming源码解读之Job详解

一:Spark Streaming Job生成深度思考 1. 做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务.例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理. 2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生