协程,又称微线程,纤程。什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销(注解:"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 (切换到另一个线程。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。)
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
协程的缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
符合协程的标准
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 一个协程遇到IO操作自动切换到其它协程
举例:
1、使用yield实现协程操作:
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name, new_baozi)) # time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == ‘__main__‘: con = consumer("c1") con2 = consumer("c2") p = producer() 输出: --->starting eating baozi... --->starting eating baozi... [c1] is eating baozi 1 [c2] is eating baozi 1 [producer] is making baozi 1 [c1] is eating baozi 2 [c2] is eating baozi 2 [producer] is making baozi 2 [c1] is eating baozi 3 [c2] is eating baozi 3 [producer] is making baozi 3 [c1] is eating baozi 4 [c2] is eating baozi 4 [producer] is making baozi 4 [c1] is eating baozi 5 [c2] is eating baozi 5 [producer] is making baozi 5
2、使用第三方模块:greenlet (手动指定执行切换协程)
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。
greenlet是python的并行处理的一个库。 python 有一个非常有名的库叫做 stackless ,用来做并发处理, 主要是弄了个叫做tasklet的微线程的东西, 而greenlet 跟stackless的最大区别是greenlet需要你自己来处理线程切换, 就是说,你需要自己指定现在执行哪个greenlet再执行哪个greenlet。相当于手动切换协程。
一个 “greenlet” 是一个小型的独立伪线程。可以把它想像成一些栈帧,栈底是初始调用的函数,而栈顶是当前greenlet的暂停位置。你使用greenlet创建一堆这样的堆栈,然后在他们之间跳转执行。跳转必须显式声明的:一个greenlet必须选择要跳转到的另一个greenlet,这会让前一个挂起,而后一个在此前挂起处恢复执行。不同greenlets之间的跳转称为切换(switching) 。
当你创建一个greenlet时,它得到一个开始时为空的栈;当你第一次切换到它时,它会执行指定的函数,这个函数可能会调用其他函数、切换跳出greenlet等等。当最终栈底的函数执行结束出栈时,这个greenlet的栈又变成空的,这个greenlet也就死掉了。greenlet也会因为一个未捕捉的异常死掉。
举例:
from greenlet import greenlet def test1(): print(12) #2、打印 gr2.switch()#3、切换协程到 test2 print(34)#6、打印 gr2.switch()#7、切换到test2 def test2(): print(56)#4、打印 gr1.switch() #5、切换协程到 test1 print(78)#8、打印,执行完成 gr1 = greenlet(test1) #启动一个协程 gr2 = greenlet(test2) #启动一个协程 gr1.switch() #1、开始调用切换协程 输出: 12 56 34 78 注意:执行的步骤顺序,从1-8。
以上例子还不能实现在协程中自动切换,greenlet 只能手动指定执行,但对于生成器来说简单很多。要实现自动监控,并且自动切换协程,如何实现?引入gevent模块。
3、使用第三方模块:gevent (自动监控并自动切换协程)
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
Gevent是第三方库,通过greenlet实现协程,其基本思想是:当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成。
包含的特性:
1.基于libev的快速事件循环
2.基于greenlet的轻量级执行单元
3.重用Python标准库且概念相似的API
4.支持SSL的协作socket
5.通过c-ares或者线程池进行DNS查询
6.使用标准库和第三方库中使用了阻塞socket的代码的能力
第三库需要另外,开源包进行安装。
举例:
A、验证gevent通过自动判断,选择最优的线路进行判断执行。注意:可gevent.sleep() 调整时间,进行验证测试。结论:每个函数里面最后一次打印,看等待的时间越短越先执行。
import gevent def foo(): print(‘running foo‘) gevent.sleep(3) print(‘Explicit context switch to foo again‘) def bar(): print(‘running bar‘) gevent.sleep(6) print(‘Implicit context switch back to bar ‘) def func3(): print("running func3") gevent.sleep(1) print ("switch back to func3") gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), gevent.spawn(func3) ]) 输出: running foo running bar running func3 switch back to func3 Explicit context switch to foo again Implicit context switch back to bar
B、同步与异步性能对区别,如下:
程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn
。 初始化的greenlet列表存放在数组threads
中,此数组被传给gevent.joinall
函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
import gevent def task(pid): gevent.sleep(1) print(‘Task %s done‘ % pid) def synchronous(): for i in range(1, 6): #range从1到5打印 task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(5)] gevent.joinall(threads) # print(‘Synchronous:‘) synchronous() #正常函数,串行的调用会每个1s打印一次 print(" ") print(‘Asynchronous:‘) asynchronous() #并行打印,等待一次性打印出来 输出: 每相隔一秒打印 Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done 等待同一时间打印 Asynchronous: Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done
C、gevent协程爬取网页,遇到IO阻塞时会自动切换业务。举例如下:
from gevent import monkey; monkey.patch_all() import gevent,time from urllib.request import urlopen def f(url): print(‘GET: %s‘ % url) resp = urlopen(url) data = resp.read() #print(data) #打印爬取到的网页内容 print(‘%d bytes received from %s.‘ % (len(data), url)) time_start = time.time() urls = [‘http://www.cnblogs.com/alex3714/articles/5248247.html‘, ‘http://www.cnblogs.com/chen170615/p/8797609.html‘, ‘http://www.cnblogs.com/chen170615/p/8761768.html‘, ] for i in urls: f(i) print("同步执行时间:",time.time() - time_start) print (" ") async_time_start = time.time() gevent.joinall([ gevent.spawn(f, ‘http://www.cnblogs.com/alex3714/articles/5248247.html‘), gevent.spawn(f, ‘http://www.cnblogs.com/chen170615/p/8797609.html‘), gevent.spawn(f,‘http://www.cnblogs.com/chen170615/p/8761768.html‘) ]) print("异步执行时间:",time.time() - async_time_start) 输出: GET: http://www.cnblogs.com/alex3714/articles/5248247.html 92147 bytes received from http://www.cnblogs.com/alex3714/articles/5248247.html. GET: http://www.cnblogs.com/chen170615/p/8797609.html 10930 bytes received from http://www.cnblogs.com/chen170615/p/8797609.html. GET: http://www.cnblogs.com/chen170615/p/8761768.html 11853 bytes received from http://www.cnblogs.com/chen170615/p/8761768.html. 同步执行时间 20.319132089614868 GET: http://www.cnblogs.com/alex3714/articles/5248247.html GET: http://www.cnblogs.com/chen170615/p/8797609.html GET: http://www.cnblogs.com/chen170615/p/8761768.html 11853 bytes received from http://www.cnblogs.com/chen170615/p/8761768.html. 10930 bytes received from http://www.cnblogs.com/chen170615/p/8797609.html. 92147 bytes received from http://www.cnblogs.com/alex3714/articles/5248247.html. 异步执行时间: 0.28768205642700195
以上例子可以看出,gevent协程异步并发执行的性能高于同步串行的执行,遇到会等待的IO同时,异步的性能就表现的优异起来。(多执行几次,就能看出对比。)
D、通过gevent实现单线程下的多socket并发
举例:
服务端:
协程gevent_socket_server.py
import sys,socket,time,gevent from gevent import socket,monkey monkey.patch_all() def server(port): gevent_server = socket.socket() gevent_server.bind((‘0.0.0.0‘,port)) gevent_server.listen() while True: cli,addr = gevent_server.accept() gevent.spawn(handle_request,cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:",data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == "__main__": server(8001)
客户端两种类型,如下:
客户端1:协程gevent_socket_client.py(普通的手工输入模式)
import socket HOST = "localhost" PORT = 8001 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: msg = bytes(input(">>:").strip(),encoding="utf-8") s.sendall(msg) data = s.recv(1024) print(‘Received‘,repr(data)) s.close()
客户端2:协程gevent_socket_cli.py(通过起进程的方式,并发执行)
import socket,threading HOST = "localhost" PORT = 8001 def sock_conn(): s = socket.socket() s.connect((HOST,PORT)) count = 0 while True: s.sendall(("hello %s" % count).encode("utf-8")) data = s.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) count += 1 s.close() for i in range(10): #测试注意数值,不要设置太大。要不然,机器回被卡死 t = threading.Thread(target=sock_conn) t.start()
原文地址:https://www.cnblogs.com/chen170615/p/8846983.html