Gevent简明教程

Gevent简明教程

发表于 2015-11-28 |  分类于 技术| |  阅读次数 5159

前述

进程 线程 协程 异步

并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。

  • 多进程编程在python中有类似C的os.fork,更高层封装的有multiprocessing标准库
  • 多线程编程python中有Thread和threading
  • 异步编程在linux下主+要有三种实现select,poll,epoll
  • 协程在python中通常会说到yield,关于协程的库主要有greenlet,stackless,gevent,eventlet等实现。

进程

  • 不共享任何状态
  • 调度由操作系统完成
  • 有独立的内存空间(上下文切换的时候需要保存栈、cpu寄存器、虚拟内存、以及打开的相关句柄等信息,开销大)
  • 通讯主要通过信号传递的方式来实现(实现方式有多种,信号量、管道、事件等,通讯都需要过内核,效率低)

线程

  • 共享变量(解决了通讯麻烦的问题,但是对于变量的访问需要加锁)
  • 调度由操作系统完成(由于共享内存,上下文切换变得高效)
  • 一个进程可以有多个线程,每个线程会共享父进程的资源(创建线程开销占用比进程小很多,可创建的数量也会很多)
  • 通讯除了可使用进程间通讯的方式,还可以通过共享内存的方式进行通信(通过共享内存通信比通过内核要快很多)

协程

  • 调度完全由用户控制
  • 一个线程(进程)可以有多个协程
  • 每个线程(进程)循环按照指定的任务清单顺序完成不同的任务(当任务被堵塞时,执行下一个任务;当恢复时,再回来执行这个任务;任务间切换只需要保存任务的上下文,没有内核的开销,可以不加锁的访问全局变量)
  • 协程需要保证是非堵塞的且没有相互依赖
  • 协程基本上不能同步通讯,多采用异步的消息通讯,效率比较高

总结

  • 进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度
  • 线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的)
  • 协程和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度

聊聊协程

协程,又称微线程,纤程。
Python的线程并不是标准线程,是系统级进程,线程间上下文切换有开销,而且Python在执行多线程时默认加了一个全局解释器锁(GIL),因此Python的多线程其实是串行的,所以并不能利用多核的优势,也就是说一个进程内的多个线程只能使用一个CPU。

def coroutine(func):
    def ret():
        f = func()
        f.next()
        return f
    return ret

@coroutine
def consumer():
    print "Wait to getting a task"
    while True:
        n = (yield)
        print "Got %s",n

import time
def producer():
    c = consumer()
    task_id = 0
    while True:
        time.sleep(1)
        print "Send a task to consumer" % task_id
        c.send("task %s" % task_id)

if __name__ == "__main__":
    producer()

结果:

Wait to getting a task
Send a task 0 to consumer
Got task 0
Send a task 1 to consumer
Got task 1
Send a task 2 to consumer
Got task 2
...

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但容易死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。

Gevent

介绍

gevent是基于协程的Python网络库。特点:

  • 基于libev的快速事件循环(Linux上epoll,FreeBSD上kqueue)。
  • 基于greenlet的轻量级执行单元。
  • API的概念和Python标准库一致(如事件,队列)。
  • 可以配合socket,ssl模块使用。
  • 能够使用标准库和第三方模块创建标准的阻塞套接字(gevent.monkey)。
  • 默认通过线程池进行DNS查询,也可通过c-are(通过GEVENT_RESOLVER=ares环境变量开启)。
  • TCP/UDP/HTTP服务器
  • 子进程支持(通过gevent.subprocess)
  • 线程池

安装和依赖

依赖于greenlet library
支持python 2.6+ 、3.3+

核心部分

  • Greenlets
  • 同步和异步执行
  • 确定性
  • 创建Greenlets
  • Greenlet状态
  • 程序停止
  • 超时
  • 猴子补丁

####Greenlets
gevent中的主要模式, 它是以C扩展模块形式接入Python的轻量级协程。 全部运行在主程序操作系统进程的内部,但它们被程序员协作式地调度。

在任何时刻,只有一个协程在运行。

区别于multiprocessing、threading等提供真正并行构造的库, 这些库轮转使用操作系统调度的进程和线程,是真正的并行。

同步和异步执行

并发的核心思想在于,大的任务可以分解成一系列的子任务,后者可以被调度成 同时执行或异步执行,而不是一次一个地或者同步地执行。两个子任务之间的 切换也就是上下文切换。

在gevent里面,上下文切换是通过yielding来完成的.


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16


import gevent

def foo():

print(‘Running in foo‘)

gevent.sleep(0)

print(‘Explicit context switch to foo again‘)

def bar():

print(‘Explicit context to bar‘)

gevent.sleep(0)

print(‘Implicit context switch back to bar‘)

gevent.joinall([

gevent.spawn(foo),

gevent.spawn(bar),

])

执行结果:


1

2

3

4


Running in foo

Explicit context to bar

Explicit context switch to foo again

Implicit context switch back to bar

代码执行过程:

网络延迟或IO阻塞隐式交出greenlet上下文的执行权。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26


import time

import gevent

from gevent import select

start = time.time()

tic = lambda: ‘at %1.1f seconds‘ % (time.time() - start)

def gr1():

print(‘Started Polling: %s‘ % tic())

select.select([], [], [], 1)

print(‘Ended Polling: %s‘ % tic())

def gr2():

print(‘Started Polling: %s‘ % tic())

select.select([], [], [], 2)

print(‘Ended Polling: %s‘ % tic())

def gr3():

print("Hey lets do some stuff while the greenlets poll, %s" % tic())

gevent.sleep(1)

gevent.joinall([

gevent.spawn(gr1),

gevent.spawn(gr2),

gevent.spawn(gr3),

])

执行结果:


1

2

3

4

5


Started Polling: at 0.0 seconds

Started Polling: at 0.0 seconds

Hey lets do some stuff while the greenlets poll, at 0.0 seconds

Ended Polling: at 1.0 seconds

Ended Polling: at 2.0 seconds

同步vs异步


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20


import gevent

import random

def task(pid):

gevent.sleep(random.randint(0,2)*0.001)

print(‘Task %s done‘ % pid)

def synchronous():

for i in xrange(5):

task(i)

def asynchronous():

threads = [gevent.spawn(task, i) for i in xrange(5)]

gevent.joinall(threads)

print(‘Synchronous:‘)

synchronous()

print(‘Asynchronous:‘)

asynchronous()

执行结果:


1

2

3

4

5

6

7

8

9

10

11

12


Synchronous:

Task 0 done

Task 1 done

Task 2 done

Task 3 done

Task 4 done

Asynchronous:

Task 2 done

Task 0 done

Task 1 done

Task 3 done

Task 4 done

确定性

greenlet具有确定性。在相同配置相同输入的情况下,它们总是会产生相同的输出。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27


import time

def echo(i):

time.sleep(0.001)

return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)

run1 = [a for a in p.imap_unordered(echo, xrange(10))]

run2 = [a for a in p.imap_unordered(echo, xrange(10))]

run3 = [a for a in p.imap_unordered(echo, xrange(10))]

run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)

run1 = [a for a in p.imap_unordered(echo, xrange(10))]

run2 = [a for a in p.imap_unordered(echo, xrange(10))]

run3 = [a for a in p.imap_unordered(echo, xrange(10))]

run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

执行结果:


1

2


False

True

即使gevent通常带有确定性,当开始与如socket或文件等外部服务交互时, 不确定性也可能溜进你的程序中。因此尽管gevent线程是一种“确定的并发”形式, 使用它仍然可能会遇到像使用POSIX线程或进程时遇到的那些问题。

涉及并发长期存在的问题就是竞争条件(race condition)(当两个并发线程/进程都依赖于某个共享资源同时都尝试去修改它的时候, 就会出现竞争条件),这会导致资源修改的结果状态依赖于时间和执行顺序。 这个问题,会导致整个程序行为变得不确定。

解决办法: 始终避免所有全局的状态.

创建Greenlets

gevent对Greenlet初始化提供了一些封装.


1

2

3

4

5

6

7

8

9

10

11

12


import gevent

from gevent import Greenlet

def foo(message, n):

gevent.sleep(n)

print(message)

thread1 = Greenlet.spawn(foo, "Hello", 1)

thread2 = gevent.spawn(foo, "I live!", 2)

thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

gevent.joinall(threads)

执行结果:


1

2


Hello

I live!

除使用基本的Greenlet类之外,你也可以子类化Greenlet类,重载它的_run方法。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17


import gevent

from gevent import Greenlet

class MyGreenlet(Greenlet):

def __init__(self, message, n):

Greenlet.__init__(self)

self.message = message

self.n = n

def _run(self):

print(self.message)

gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)

g.start()

g.join()

执行结果:


1

Hi there!

Greenlet状态

greenlet的状态通常是一个依赖于时间的参数:

  • started – Boolean, 指示此Greenlet是否已经启动
  • ready() – Boolean, 指示此Greenlet是否已经停止
  • successful() – Boolean, 指示此Greenlet是否已经停止而且没抛异常
  • value – 任意值, 此Greenlet代码返回的值
  • exception – 异常, 此Greenlet内抛出的未捕获异常

程序停止

程序
当主程序(main program)收到一个SIGQUIT信号时,不能成功做yield操作的 Greenlet可能会令意外地挂起程序的执行。这导致了所谓的僵尸进程, 它需要在Python解释器之外被kill掉。

通用的处理模式就是在主程序中监听SIGQUIT信号,调用gevent.shutdown退出程序。


1

2

3

4

5

6

7

8

9

10


import gevent

import signal

def run_forever():

gevent.sleep(1000)

if __name__ == ‘__main__‘:

gevent.signal(signal.SIGQUIT, gevent.shutdown)

thread = gevent.spawn(run_forever)

thread.join()

超时

通过超时可以对代码块儿或一个Greenlet的运行时间进行约束。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15


import gevent

from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)

timeout.start()

def wait():

gevent.sleep(10)

try:

gevent.spawn(wait).join()

except Timeout:

print(‘Could not complete‘)

超时类


1

2

3

4

5

6

7

8

9

10


import gevent

from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):

pass

with Timeout(time_to_wait, TooLong):

gevent.sleep(10)

另外,对各种Greenlet和数据结构相关的调用,gevent也提供了超时参数。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30


import gevent

from gevent import Timeout

def wait():

gevent.sleep(2)

timer = Timeout(1).start()

thread1 = gevent.spawn(wait)

try:

thread1.join(timeout=timer)

except Timeout:

print(‘Thread 1 timed out‘)

# --

timer = Timeout.start_new(1)

thread2 = gevent.spawn(wait)

try:

thread2.get(timeout=timer)

except Timeout:

print(‘Thread 2 timed out‘)

# --

try:

gevent.with_timeout(1, wait)

except Timeout:

print(‘Thread 3 timed out‘)

执行结果:


1

2

3


Thread 1 timed out

Thread 2 timed out

Thread 3 timed out

猴子补丁(Monkey patching)

gevent的死角.


1

2

3

4

5

6

7

8

9

10

11

12

13


import socket

print(socket.socket)

print("After monkey patch")

from gevent import monkey

monkey.patch_socket()

print(socket.socket)

import select

print(select.select)

monkey.patch_select()

print("After monkey patch")

print(select.select)

执行结果:


1

2

3

4

5

6

7


class ‘socket.socket‘

After monkey patch

class ‘gevent.socket.socket‘

built-in function select

After monkey patch

function select at 0x1924de8

Python的运行环境允许我们在运行时修改大部分的对象,包括模块,类甚至函数。 这是个一般说来令人惊奇的坏主意,因为它创造了“隐式的副作用”,如果出现问题 它很多时候是极难调试的。虽然如此,在极端情况下当一个库需要修改Python本身 的基础行为的时候,猴子补丁就派上用场了。在这种情况下,gevent能够修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。

例如,Redis的python绑定一般使用常规的tcp socket来与redis-server实例通信。 通过简单地调用gevent.monkey.patch_all(),可以使得redis的绑定协作式的调度 请求,与gevent栈的其它部分一起工作。

这让我们可以将一般不能与gevent共同工作的库结合起来,而不用写哪怕一行代码。 虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是“有用的邪恶(useful evil)”。

数据结构

  • 事件
  • 队列
  • 组和池
  • 锁和信号量
  • 线程局部变量
  • 子进程
  • Actors

    事件

    事件(event)是一个在Greenlet之间异步通信的形式。


    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26


    import gevent

    from gevent.event import Event

    evt = Event()

    def setter():

    print(‘A: Hey wait for me, I have to do something‘)

    gevent.sleep(3)

    print("Ok, I‘m done")

    evt.set()

    def waiter():

    print("I‘ll wait for you")

    evt.wait() # blocking

    print("It‘s about time")

    def main():

    gevent.joinall([

    gevent.spawn(setter),

    gevent.spawn(waiter),

    gevent.spawn(waiter),

    gevent.spawn(waiter)

    ])

    if __name__ == ‘__main__‘:

    main()

执行结果:


1

2

3

4

5

6

7

8


A: Hey wait for me, I have to do something

I‘ll wait for you

I‘ll wait for you

I‘ll wait for you

Ok, I‘m done

It‘s about time

It‘s about time

It‘s about time

事件对象的一个扩展是AsyncResult,它允许你在唤醒调用上附加一个值。 它有时也被称作是future或defered,因为它持有一个指向将来任意时间可设置为任何值的引用。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15


import gevent

from gevent.event import AsyncResult

a = AsyncResult()

def setter():

gevent.sleep(3)

a.set(‘Hello!‘)

def waiter():

print(a.get())

gevent.joinall([

gevent.spawn(setter),

gevent.spawn(waiter),

])

队列

队列是一个排序的数据集合,它有常见的put / get操作, 但是它是以在Greenlet之间可以安全操作的方式来实现的。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23


import gevent

from gevent.queue import Queue

tasks = Queue()

def worker(n):

while not tasks.empty():

task = tasks.get()

print(‘Worker %s got task %s‘ % (n, task))

gevent.sleep(0)

print(‘Quitting time!‘)

def boss():

for i in xrange(1,10):

tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([

gevent.spawn(worker, ‘steve‘),

gevent.spawn(worker, ‘john‘),

gevent.spawn(worker, ‘nancy‘),

])

执行结果:


1

2

3

4

5

6

7

8

9

10

11

12


Worker steve got task 1

Worker john got task 2

Worker nancy got task 3

Worker steve got task 4

Worker john got task 5

Worker nancy got task 6

Worker steve got task 7

Worker john got task 8

Worker nancy got task 9

Quitting time!

Quitting time!

Quitting time!

put和get操作都是阻塞的,put_nowait和get_nowait不会阻塞, 然而在操作不能完成时抛出gevent.queue.Empty或gevent.queue.Full异常。

组和池

组(group)是一个运行中greenlet集合,集合中的greenlet像一个组一样会被共同管理和调度。 它也兼饰了像Python的multiprocessing库那样的平行调度器的角色,主要用在在管理异步任务的时候进行分组。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18


import gevent

from gevent.pool import Group

def talk(msg):

for i in xrange(2):

print(msg)

g1 = gevent.spawn(talk, ‘bar‘)

g2 = gevent.spawn(talk, ‘foo‘)

g3 = gevent.spawn(talk, ‘fizz‘)

group = Group()

group.add(g1)

group.add(g2)

group.join()

group.add(g3)

group.join()

执行结果:


1

2

3

4

5

6


bar

bar

foo

foo

fizz

fizz

池(pool)是一个为处理数量变化并且需要限制并发的greenlet而设计的结构。


1

2

3

4

5

6

7

8

9


import gevent

from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):

print(‘Size of pool %s‘ % len(pool))

pool.map(hello_from, xrange(3))

执行结果:


1

2

3


Size of pool 2

Size of pool 2

Size of pool 1

构造一个socket池的类,在各个socket上轮询。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20


from gevent.pool import Pool

class SocketPool(object):

def __init__(self):

self.pool = Pool(10)

self.pool.start()

def listen(self, socket):

while True:

socket.recv()

def add_handler(self, socket):

if self.pool.full():

raise Exception("At maximum pool size")

else:

self.pool.spawn(self.listen, socket)

def shutdown(self):

self.pool.kill()

锁和信号量

信号量是一个允许greenlet相互合作,限制并发访问或运行的低层次的同步原语。 信号量有两个方法,acquire和release。在信号量是否已经被 acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会 阻塞acquire操作直到另一个已经获得信号量的greenlet作出释放。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21


from gevent import sleep

from gevent.pool import Pool

from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):

sem.acquire()

print(‘Worker %i acquired semaphore‘ % n)

sleep(0)

sem.release()

print(‘Worker %i released semaphore‘ % n)

def worker2(n):

with sem:

print(‘Worker %i acquired semaphore‘ % n)

sleep(0)

print(‘Worker %i released semaphore‘ % n)

pool = Pool()

pool.map(worker1, xrange(0,2))

执行结果:


1

2

3

4


Worker 0 acquired semaphore

Worker 1 acquired semaphore

Worker 0 released semaphore

Worker 1 released semaphore

锁(lock)是范围为1的信号量。它向单个greenlet提供了互斥访问。 信号量和锁常被用来保证资源只在程序上下文被单次使用。

线程局部变量

Gevent允许程序员指定局部于greenlet上下文的数据。 在内部,它被实现为以greenlet的getcurrent()为键, 在一个私有命名空间寻址的全局查找。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22


import gevent

from gevent.local import local

stash = local()

def f1():

stash.x = 1

print(stash.x)

def f2():

stash.y = 2

print(stash.y)

try:

stash.x

except AttributeError:

print("x is not local to f2")

g1 = gevent.spawn(f1)

g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

执行结果:


1

2

3


1

2

x is not local to f2

很多集成了gevent的web框架将HTTP会话对象以线程局部变量的方式存储在gevent内。 例如使用Werkzeug实用库和它的proxy对象,我们可以创建Flask风格的请求对象。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32


from gevent.local import local

from werkzeug.local import LocalProxy

from werkzeug.wrappers import Request

from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()

request = LocalProxy(lambda: _requests.request)

@contextmanager

def sessionmanager(environ):

_requests.request = Request(environ)

yield

_requests.request = None

def logic():

return "Hello " + request.remote_addr

def application(environ, start_response):

status = ‘200 OK‘

with sessionmanager(environ):

body = logic()

headers = [

(‘Content-Type‘, ‘text/html‘)

]

start_response(status, headers)

return [body]

WSGIServer((‘‘, 8000), application).serve_forever()

子进程

从gevent 1.0起,支持gevent.subprocess,支持协作式的等待子进程。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15


import gevent

from gevent.subprocess import Popen, PIPE

def cron():

while True:

print("cron")

gevent.sleep(0.2)

g = gevent.spawn(cron)

sub = Popen([‘sleep 1; uname‘], stdout=PIPE, shell=True)

out, err = sub.communicate()

g.kill()

print(out.rstrip())

```

执行结果:

cron
cron
cron
cron
cron
Linux

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43


很多人也想将gevent和multiprocessing一起使用。最明显的挑战之一 就是multiprocessing提供的进程间通信默认不是协作式的。由于基于 multiprocessing.Connection的对象(例如Pipe)暴露了它们下面的 文件描述符(file descriptor),gevent.socket.wait_read和wait_write 可以用来在直接读写之前协作式的等待ready-to-read/ready-to-write事件。

```python

import gevent

from multiprocessing import Process, Pipe

from gevent.socket import wait_read, wait_write

# To Process

a, b = Pipe()

# From Process

c, d = Pipe()

def relay():

for i in xrange(5):

msg = b.recv()

c.send(msg + " in " + str(i))

def put_msg():

for i in xrange(5):

wait_write(a.fileno())

a.send(‘hi‘)

def get_msg():

for i in xrange(5):

wait_read(d.fileno())

print(d.recv())

if __name__ == ‘__main__‘:

proc = Process(target=relay)

proc.start()

g1 = gevent.spawn(get_msg)

g2 = gevent.spawn(put_msg)

gevent.joinall([g1, g2], timeout=1)

```

执行结果:

```

hi in 0

hi in 1

hi in 2

hi in 3

hi in 4

然而要注意,组合multiprocessing和gevent必定带来 依赖于操作系统(os-dependent)的缺陷,其中有:

在兼容POSIX的系统创建子进程(forking)之后, 在子进程的gevent的状态是不适定的(ill-posed)。一个副作用就是, multiprocessing.Process创建之前的greenlet创建动作,会在父进程和子进程两方都运行。

上例的put_msg()中的a.send()可能依然非协作式地阻塞调用的线程:一个 ready-to-write事件只保证写了一个byte。在尝试写完成之前底下的buffer可能是满的。

上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因为Windows不能监视 pipe事件。

Python包gipc以大体上透明的方式在 兼容POSIX系统和Windows上克服了这些挑战。它提供了gevent感知的基于 multiprocessing.Process的子进程和gevent基于pipe的协作式进程间通信。

Actors

actor模型是一个由于Erlang变得普及的更高层的并发模型。 简单的说它的主要思想就是许多个独立的Actor,每个Actor有一个可以从 其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并根据它期望的行为来采取行动。

Gevent没有原生的Actor类型,但在一个子类化的Greenlet内使用队列, 我们可以定义一个非常简单的。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21


import gevent

from gevent.queue import Queue

class Actor(gevent.Greenlet):

def __init__(self):

self.inbox = Queue()

Greenlet.__init__(self)

def receive(self, message):

"""

Define in your subclass.

"""

raise NotImplemented()

def _run(self):

self.running = True

while self.running:

message = self.inbox.get()

self.receive(message)

下面是一个使用的例子:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24


import gevent

from gevent.queue import Queue

from gevent import Greenlet

class Pinger(Actor):

def receive(self, message):

print(message)

pong.inbox.put(‘ping‘)

gevent.sleep(0)

class Ponger(Actor):

def receive(self, message):

print(message)

ping.inbox.put(‘pong‘)

gevent.sleep(0)

ping = Pinger()

pong = Ponger()

ping.start()

pong.start()

ping.inbox.put(‘start‘)

gevent.joinall([ping, pong])

实际应用

  • Gevent ZeroMQ
  • 简单server
  • WSGI Servers
  • 流式server
  • Long Polling
  • Websockets

简单server


1

2

3

4

5

6

7

8

9

10

11

12

13


# On Unix: Access with ``$ nc 127.0.0.1 5000``

# On Window: Access with ``$ telnet 127.0.0.1 5000``

from gevent.server import StreamServer

def handle(socket, address):

socket.send("Hello from a telnet!\n")

for i in range(5):

socket.send(str(i) + ‘\n‘)

socket.close()

server = StreamServer((‘127.0.0.1‘, 5000), handle)

server.serve_forever()

WSGI Servers And Websockets

Gevent为HTTP内容服务提供了两种WSGI server。从今以后就称为 wsgi和pywsgi:

  • gevent.wsgi.WSGIServer
  • gevent.pywsgi.WSGIServer

glb中使用


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48


import click

from flask import Flask

from gevent.pywsgi import WSGIServer

from geventwebsocket.handler import WebSocketHandler

import v1

from .settings import Config

from .sockethandler import handle_websocket

def create_app(config=None):

app = Flask(__name__, static_folder=‘static‘)

if config:

app.config.update(config)

else:

app.config.from_object(Config)

app.register_blueprint(

v1.bp,

url_prefix=‘/v1‘)

return app

def wsgi_app(environ, start_response):

path = environ[‘PATH_INFO‘]

if path == ‘/websocket‘:

handle_websocket(environ[‘wsgi.websocket‘])

else:

return create_app()(environ, start_response)

@click.command()

@click.option(‘-h‘, ‘--host_port‘, type=(unicode, int),

default=(‘0.0.0.0‘, 5000), help=‘Host and port of server.‘)

@click.option(‘-r‘, ‘--redis‘, type=(unicode, int, int),

default=(‘127.0.0.1‘, 6379, 0),

help=‘Redis url of server.‘)

@click.option(‘-p‘, ‘--port_range‘, type=(int, int),

default=(50000, 61000),

help=‘Port range to be assigned.‘)

def manage(host_port, redis=None, port_range=None):

Config.REDIS_URL = ‘redis://%s:%s/%s‘ % redis

Config.PORT_RANGE = port_range

http_server = WSGIServer(host_port,

wsgi_app, handler_class=WebSocketHandler)

print ‘----GLB Server run at %s:%s-----‘ % host_port

print ‘----Redis Server run at %s:%s:%s-----‘ % redis

http_server.serve_forever()

缺陷

和其他异步I/O框架一样,gevent也有一些缺陷:

  • 阻塞(真正的阻塞,在内核级别)在程序中的某个地方停止了所有的东西.这很像C代码中monkey patch没有生效
  • 保持CPU处于繁忙状态.greenlet不是抢占式的,这可能导致其他greenlet不会被调度.
  • 在greenlet之间存在死锁的可能.

一个gevent回避的缺陷是,你几乎不会碰到一个和异步无关的Python库–它将阻塞你的应用程序,因为纯Python库使用的是monkey patch的stdlib.

原文地址:https://www.cnblogs.com/timssd/p/11167877.html

时间: 2024-11-05 15:17:37

Gevent简明教程的相关文章

Lisp简明教程

此教程是我花了一点时间和功夫整理出来的,希望能够帮到喜欢Lisp(Common Lisp)的朋友们.本人排版很烂还望多多海涵! <Lisp简明教程>PDF格式下载 <Lisp简明教程>ODT格式下载 具体的内容我已经编辑好了,想下载的朋友可以用上面的链接.本人水平有限,如有疏漏还望之处(要是有谁帮我排排版就好了)还望指出!资料虽然是我整理的,但都是网友的智慧,如果有人需要转载,请至少保留其中的“鸣谢”页(如果能有我就更好了:-)). Lisp简明教程 整理人:Chaobs 邮箱:[

Linux防火墙iptables简明教程

前几天微魔部落再次遭受到个别别有用心的攻击者的攻击,顺便给自己充个电,复习了一下linux下常见的防火墙iptables的一些内容,但是无奈网上的很多教程都较为繁琐,本着简明化学习的目的,微魔为大家剔除了许多冗余的内容,提取出尽量多的精华部分成文,和大家共同学习,本文涉及的内容包括如下 Linux防火墙iptables简明教程 1.安装iptables 2.查看现有的iptables规则 3.删除某iptables规则 4.清除现有iptables规则 5.创建规则 6.设置开机启动 7.保存i

Markdown简明教程4-Markdown UML图

1. 前言 Markdown是一种轻量级的标记语言,把作者从繁杂的排版工作中解放出来,实现易读易写的文章写作,已经逐渐成为事实上的行业标准.CSDN博客支持Markdown可以让广大博友更加专注于博客内容,大赞.但是,不少博友可能对Markdown比较生疏,本博接下来用一个系列文章<Markdown简明教程>扼要介绍Markdown,希望可以对大家有所帮助. 系列教程目录 关于Markdown Markdown基本使用 Markdown表格和公式 Markdown UML图 CSDN Mark

JSP 简明教程(二):JSP基本语法

基本语法 JSP只是在html的基础上嵌入一些动态的元素.下面是HelloWorld代码: <html> <% out.println("hello world"); %> </html> 以上代码中的<% %>就是动态元素.JSP中所有的特殊语法如下: <% %>:用于表示一段Java代码.这段代码在编译之后会放在_jspService()方法中. <%! %>:多了一个叹号,也用于表示一段Java代码,但是这段

第一课 C语言简明教程

1序言: 1与Java.C#等高级语言相比,C语言却非常简单,学习简单,使用也简单,但是也非常重要,到目前为止基本上操作系统的内核代码超过百分之九十使用C语言完成,因此学好C语言是学好计算机这门课程的基础,特别是进入系统编程尤为明显. 今天是本人复习C语言课程的第一课,主要重新记录一下C语言的基础知识,这节课涉及到C语言的结构.变量以及类型.输入输出.条件判断以及循环知识. 2知识点: 2.1 C语言的结构 2.1.1 通常情况下C语言程序是由: 1.相关的代码注释,使用/* ··· */可注释

Vbs 脚本编程简明教程之一

-为什么要使用 Vbs ? 在 Windows 中,学习计算机操作也许很简单,但是很多计算机工作是重复性劳动,例如你每周也许需要对一些计算机文件进行复制.粘贴.改名.删除,也许你每天启动 计算机第一件事情就是打开 WORD ,切换到你喜爱的输入法进行文本编辑,同时还要播放优美的音乐给工作创造一个舒心的环境,当然也有可能你经常需要对文本中的某 些数据进行整理,把各式各样的数据按照某种规则排列起来--.这些事情重复.琐碎,使人容易疲劳. 第三方软件也许可以强化计算机的某些功能,但是解决这些重复劳动往

Smarty教程1.引擎定义2.主要优点3.简明教程4.使用判断5.循环数组6.常见问题8.解释程序

Smarty是一个php模板引擎.更准确的说,它分开了逻辑程序和外在的内容,提供了一种易于管理的方法.可以描述为应用程序员和美工扮演了不同的角色,因为在大多数情况下 ,他们不可能是同一个人.例如,你正在创建一个用于浏览新闻的网页,新闻标题,标签栏,作者和内容等都是内容要素,他们并不包含应该怎样去呈现.在Smarty的程序里,这些被忽略了.模板设计者们编辑模板,组合使用html标签和模板标签去格式化这些要素的输出(html表格,背景色,字体大小,样式表,等等).有一天程序员想要改变文章检索的方式(

《Python简明教程》总结

Python经典教程<Python简明教程> 目录: 为什么Python 安装Python 体验Python Python数据类型 运算符与表达式 控制流 函数 模块 数据结构 解决问题 面向对象 输入输出 异常 标准库 更多的内容

Java泛型简明教程

Java泛型简明教程 博客分类: Java综合 JavaApple数据结构CC++ Java泛型简明教程 本文是从 Java Generics Quick Tutorial 这篇文章翻译而来. 泛型是Java SE 5.0中引入的一项特征,自从这项语言特征出现多年来,我相信,几乎所有的Java程序员不仅听说过,而且使用过它.关于Java泛型的教程,免费的,不免费的,有很多.我遇到的最好的教材有: The Java Tutorial Java Generics and Collections ,