本篇将讨论gevent的两架马车-libev和greenlet如何协同工作的。
gevent事件驱动底层使用了libev,我们先看看如何单独使用gevent中的事件循环。
#coding=utf8 import socket import gevent from gevent.core import loop def f(): s, address = sock.accept() print address s.send("hello world\r\n") loop = loop() sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.bind(("localhost",8000)) sock.listen(10) io = loop.io(sock.fileno(),1) #1代表read io.start(f) loop.run()
代码很简单,使用core.loop新建了一个loop实例,通过io加入socket读事件,通过start设置回调,然后run启动事件循环,一个简单的helloworld服务器搭建好了,可以通过telnet localhost 8000看响应结果。
gevent的整个事件循环是在hub.run中启动的,
def run(self): assert self is getcurrent(), ‘Do not call Hub.run() directly‘ while True: loop = self.loop loop.error_handler = self try: loop.run() finally: loop.error_handler = None # break the refcount cycle self.parent.throw(LoopExit(‘This operation would block forever‘))
上面的self.loop和我们上面自己新建的loop对象是一样的,下面我们通过socket的recv函数看时间是如何注册到loop中。
gevent的socket对象被gevent重新封装,原始socket就是下面的self._sock
我们来看看gevent的socket一次recv做了什么操作。
gevent/socket.py
def recv(self, *args): sock = self._sock # keeping the reference so that fd is not closed during waiting while True: try: return sock.recv(*args) # 1.如果此时socket已经有数据,则直接return except error: #没有数据将会抛出异常,且errno为EWOULDBLOCK ex = sys.exc_info()[1] if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0: raise # QQQ without clearing exc_info test__refcount.test_clean_exit fails sys.exc_clear() #此时将该文件描述符的”读事件“加入到loop中 self._wait(self._read_event) """self._wait会调用hub.wait, def wait(self, watcher): waiter = Waiter() unique = object() watcher.start(waiter.switch, unique) #这个watcher就是上面说的loop.io()实例,waiter.switch就是回调函数 try: result = waiter.get() assert result is unique, ‘Invalid switch into %s: %r (expected %r)‘ % (getcurrent(), result, unique) finally: watcher.stop() 当loop捕获到”可读事件“时,将会回调waiter.switch方法,此时将回到这里(因为while循环)继续执行sock.recv(*args) 一般来说当重新recv时肯定是可以读到数据的,将直接返回 """
上面的self._read_event = io(fileno, 1),再次回到while大循环中,将直接return sock.recv的结果。我们知道socke.recv(1024)可能返回的并没有1024字节,这要看此时缓冲区已接受多少字节,所以说数据可能一次没有读完,所以可能会触发多次
EWOULDBLOCK,多次读取,只有recv为空字符串时才代表读取结束。典型的读取整个数据一般如下所示:
buff = [] while 1: s = socket.recv(1024) if not s: break else: buff.append(s) buff = "".jon(buff)
你可能有点好奇,在gevent中有多处使用了assert判断waiter的返回值,如:hub.wait
class Hub(greenlet): def wait(self, watcher): waiter = Waiter() unique = object() watcher.start(waiter.switch, unique) try: result = waiter.get() assert result is unique, ‘Invalid switch into %s: %r (expected %r)‘ % (getcurrent(), result, unique) #这里为什么要assert? #因为正常肯定是loop调用waiter.switch(unique),那么waiter.get()获取的肯定是unique, #如果不是unique,肯定是有其它地方调用waiter.switch,这很不正常 finally: watcher.stop()
这主要是为了防止回调函数被其它greenlet调用,因为greenlet通过switch传递参数,看下面代码:
def f(t): gevent.sleep(t) p = gevent.spawn(f,2) gevent.sleep(0) # 2s后libev将回调f,所以下面p.get获取的是2 switcher = gevent.spawn(p.switch, ‘hello‘) #强先回调p.switch,传递参数hello result = p.get()
将返回以下异常:
将报如下异常: AssertionError: Invalid switch into <Greenlet at 0x252c2b0: f(2)>: ‘hello‘ (expected <object object at 0x020414E0>) <Greenlet at 0x252c2b0: f(2)> failed with AssertionError
我们再看看gevent封装的greenlet,
class Greenlet(greenlet): """A light-weight cooperatively-scheduled execution unit.""" def __init__(self, run=None, *args, **kwargs): hub = get_hub() greenlet.__init__(self, parent=hub) if run is not None: self._run = run
我们看到所有的Greenlet的parent都是hub,这有什么好处呢?
因为当一个greenlet死掉的时候将回到父greenlet中,也就是hub中,hub将从运行上次回调的地方继续开始事件循环,这也就是为什么事件循环是在hub中运行的理由。
我们来看一个一个Greenlet的生命周期
启动Greenlet需要调用start()方法,
def start(self): """Schedule the greenlet to run in this loop iteration""" if self._start_event is None: self._start_event = self.parent.loop.run_callback(self.switch)
也就是将当前的switch加入到loop事件循环中。当loop回调self.switch时将运行run方法(这是底层greenlet提供的),
继承时我们可以提供_run方法。
def run(self): try: if self._start_event is None: self._start_event = _dummy_event else: self._start_event.stop() #取消之前添加的回调函数,loop将会从回调链中剔除该函数。 #libev提供了一系列的对象封装,如io,timer,都有start,stop方法 #而回调是通过loop.run_callback开启的,和其它有所不同 try: result = self._run(*self.args, **self.kwargs) #运行自定义_run方法 except: self._report_error(sys.exc_info()) return self._report_result(result) #设置返回结果,这是个比较重要的方法,下面会单独看看 finally: pass
一切顺利,没有异常将调用_report_result方法,我们具体看看:
def _report_result(self, result): self._exception = None self.value = result #设置返回结果,可通过get()获取,注意要获取value时 #不要直接通过.value,一定要用get方法,因为get()会获取到真正的运行后结果, #而.value那是该Greenlet可能还没结束 if self._links and not self._notifier: #这个是干什么的? self._notifier = self.parent.loop.run_callback(self._notify_links)
为什么说一定要通过get()才能获取最后返回结果呢,因为get()相当于异步的结果返回,那么很有可能Greenlet还没结果我们就调用
get()想获取结果,如果不是异步,肯定是获取不到的。我们看看get()操作,
def get(self, block=True, timeout=None): """Return the result the greenlet has returned or re-raise the exception it has raised. If block is ``False``, raise :class:`gevent.Timeout` if the greenlet is still alive. If block is ``True``, unschedule the current greenlet until the result is available or the timeout expires. In the latter case, :class:`gevent.Timeout` is raised. """ if self.ready(): #该Greenlet已经运行结束,直接返回结果 if self.successful(): return self.value else: raise self._exception if block: #到这里说明该Greenlet并没有结束 switch = getcurrent().switch self.rawlink(switch) #将当前Greenlet.switch加到自己的回调链中 """ self._links.append(callback) """ try: t = Timeout.start_new(timeout) try: result = self.parent.switch() #切换到hub,可以理解为当前get()阻塞了,当再次回调刚刚注册的switch将回到这里 #可问题是好像我们没有将switch注册到hub中,那是谁去回调的呢? #幕后黑手其实就是上面的_report_result,当Greenlet结束最后会调用_report_result, #而_report_result把将_notify_links注册到loop的回调中,最后由_notify_links回调我们刚注册的switch # def _notify_links(self): # while self._links: # link = self._links.popleft() # try: # link(self) #就是这里了,我们看到还把self传给了switch,所以result结果就是self(greenlet通过switch传递结果) # except: # self.parent.handle_error((link, self), *sys.exc_info()) assert result is self, ‘Invalid switch into Greenlet.get(): %r‘ % (result, ) #知道为什么result是self的原因了吧 finally: t.cancel() except: self.unlink(switch) raise #运行到这里,其实Greenlet已经结束了,换句话说self.ready()肯定为True if self.ready(): if self.successful(): return self.value else: raise self._exception else: #还没结束,你又不等待,没有值返回啊,只能抛出异常了 raise Timeout
通过上面我们知道其实get()就是异步返回结果的方式,当Greenelt要结束时通过run()函数最后的_report_result返回,所以_report_result还是很重要的。
其实Greenlet还提供了一个switch_out的方法,在gevent中switch_out是和switch相对应的一个概念,当切换到Greenlet时将
调用switch方法,切换到hub时将调用Greenlet的switch_out方法,也就是给Greenlet一个保存恢复的功能。
gevent中backdoor.py(提供了一个python解释器的后门)使用了switch,我们来看看
class SocketConsole(Greenlet): def switch(self, *args, **kw): self.saved = sys.stdin, sys.stderr, sys.stdout sys.stdin = sys.stdout = sys.stderr = self.desc Greenlet.switch(self, *args, **kw) def switch_out(self): sys.stdin, sys.stderr, sys.stdout = self.saved
switch_out用的非常漂亮,因为交换环境需要使用sys.stdin,sys.stdout,sys.stderr,所以当切换到我们Greenlet时,
把这三个变量都替换成我们自己的socket描述符,但当要切换到hub时需要恢复这三个变量,所以在switch中先保存,在switch_out中再恢复,switch_out是切换到hub时,与hub的switch调用实现:
class Hub(Greenlet): def switch(self): #我们看到的确是先调用先前的Greenlet.switch_out switch_out = getattr(getcurrent(), ‘switch_out‘, None) if switch_out is not None: switch_out() return greenlet.switch(self)
可以通过下面两句话就启动一个python后门解释器,感兴趣的童鞋可以玩玩。
from gevent.backdoor import BackdoorServer BackdoorServer((‘127.0.0.1‘, 9000)).serve_forever()
通过telnet,你可以为所欲为。
在gevent中基本上每个函数都有timeout参数,这主要是通过libev的timer实现。
使用如下:
Timeout对象有pending属性,判断是是否还未运行
t=Timeout(1) t.start() try: print ‘aaa‘ import time assert t.pending == True time.sleep(2) gevent.sleep(0.1) #注意这里不可以是sleep(0),虽然sleep(0)也切换到hub,定时器也到了,但gevent注册的回调 #是优先级是高于定时器的(在libev事件循环中先调用callback,然后才是timer) except Timeout,e: assert t.pending == False assert e is t #判断是否是我的定时器,和上面的assert一致,防止不是hub调用t.switch print sys.exc_info() finally: #取消定时器,不管定时器是否可用,都可取消 t.cancel()
Timout对象还提供了with上下文支持:
with Timeout(1) as t: assert t.pending gevent.sleep(0.5) assert not t.pending
Timeout第二个参数可以自定义异常,如果是Fasle,with上下文将不传递异常
with Timeout(1,False) as t: assert t.pending gevent.sleep(2) assert not sys.exc_info()[1] 我们看到并没有抛出异常
还有一个with_timeout快捷方式:
def f(): import time time.sleep(2) gevent.sleep(0.1) #不能使用gevent.sleep(0) print ‘fff‘ t = with_timeout(1,f,timeout_value=10) assert t == 10
注意with_timeout必须有timeout_value参数时才不会抛Timeout异常。
到这里我们对gevnet的底层应该都很熟悉了,对gevent还未介绍到的就是一些高层的东西,如Event,Pool等,后期也会单独拿出来
讲讲。我觉得还需要关注的就是libev的使用,不过这就需要我们深入分析core.pyx的libev cython扩展了,这需要cython的知识,最近我也一直在看源码,后期也会和大家分享。
至于为什么要分析libev的扩展呢?主要是在游戏中有一些定时执行的任务,通过gevent现有的实现比较蹩脚,其实libev提供的timer有两个参数,一个after,一个repeat,after是多久以后启动该定时器,repeat是多次以后再次启动,这刚好满足我的需求,
下面就是我写的一个简单的定时任务脚本,通过gfirefly启动,还提供了web接口。
#coding:utf-8 ‘‘‘ Created on 2014-9-5 @author: http://blog.csdn.net/yueguanghaidao ‘‘‘ import traceback import datetime from flask import request from gevent.hub import get_hub from gtwisted.utils import log from gfirefly.server.globalobject import webserviceHandle from app.models.role import Role ‘‘‘ 定时任务 任务名 (运行时间(0-24),每次间隔)单位为小时,回调函数均为do_name ‘‘‘ CRONTAB = { "energy": (0, 1), #恢复体力 "god_surplustime": (0, 24), "arena_surplustime": (22, 24), "arena_rewrad": (21, 24), "sign_reset": (1, 24) } def log_except(fun): def wrapper(*args): try: log.msg(fun.__name__) return fun(args) except: log.msg(traceback.format_exc()) return wrapper class Task(object): """所有定时任务 """ @classmethod @log_except def do_energy(cls): """每一个小时增加1体力(体力小于8) """ Role.objects(energy__lt=8).update(inc__energy=1) @classmethod @log_except def do_god_surplustime(cls): """财神剩余次数 """ Role.objects(god__exists=True).update(set__god__surplustime=10) @webserviceHandle("/cron", methods=[‘GET‘, ‘POST‘]) def cron(): """提供web接口调用 """ action = request.args.get("action") if not action: return "action:<br/><br/>"+"<br/>".join(( a for a in CRONTAB)) else: try: f = getattr(Task, "do_"+action) try: f() except: return traceback.format_exc() return "success" except AttributeError: return "action:<br/><br/>"+"<br/>".join(( a for a in CRONTAB)) def timer(after, repeat): return get_hub().loop.timer(after, repeat) def run(): log.msg("cron start") #配置mongodb mongoconfig.init_Mongo() for action, t in CRONTAB.items(): log.msg("%s start" % action) f = getattr(Task, "do_"+action) now = datetime.datetime.now() other = now.replace(hour=t[0],minute=0,second=0) if other > now: after = (other-now).seconds else: after = 24*3600-(now-other).seconds #after = t[0]*3600 timer(after, t[1]*3600).start(f) run()