我们在生产中,常用的处理任务模型有三种:
单线程
多线程
异步(单线程内,串行,特点是遇到阻塞(或IO之类的)就切换到其他任务)
其中一般如果都符合要求,那么异步是最好的选择。
单线程:遇到阻塞整个程序都等待
多线程:以空间换取时间,且有时候伴随着数据安全问题(通常加锁来处理)
异步:在单个线程内,且是串行执行,但是一旦遇到阻塞(IO之类的),就会切换到线程内的其他任务(把IO操作交给操作系统处理)
当我们面对如下的环境时,事件驱动模型(异步模型)通常是一个好的选择(and):
1、程序中有许多任务
2、任务之间高度独立(因此它们不需要互相通信,或者等待彼此)
3、在等待事件到来时,某些任务会阻塞。
常用的异步IO模型:select、poll、Epoll (windows下只支持select)
nginx就是Epoll模型实现的。单线程,多进程(为了利用多核,单线程只能跑在单个cup核心上)
前面我们说了多线程与多进程,总结其特点就是:
单线程串行执行,遇到IO就阻塞,效率低。
多线程并发执行,遇到IO就切换(用空间换取执行时间),效率上去了,但是耗费资源,操作复杂。
针对以上问题,出现了一种新的替代品,协程。
协程
协程又名微线程,纤程。协程是一种用户态的轻量级线程(协程由用户切换,线程由cpu时间片控制切换)协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。跟yield过程差不多
协程的定义标准:
1、必须在一个单线程里面实现并发
2、修改共享数据不需要加锁(因为协程是串行的)
3、用户程序里自己保存多个控制流的上下文栈
4、一个协程遇到IO操作(阻塞也一样)就自动切换到其他协程
协程的好处:
1、无需线程上下文切换的开销
2、无需原子操作锁定及同步的开销
"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
3、方便切换控制流,简化编程模型
4、高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
1、无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
2、进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
协程例子
1、用yield实现协程操作的例子:
1 import time 2 3 def consumer(name): 4 print("\033[32;1m ---> starting eating baozi... \033[0m") 5 while True: 6 new_baozi = yield 7 print("[%s] is eating baozi %s " %(name,new_baozi)) 8 # time.sleep(1) #在yield里面并没有实现阻塞切换 9 10 def producer(): 11 c = con1.__next__() 12 c = con2.__next__() 13 n = 0 14 while n < 5: 15 n += 1 16 print("\033[32;1m [producer]\033[0m is making baozi %s " % n) 17 con1.send(n) 18 con2.send(n) #用send可以想yield发送数据 19 20 if __name__ == ‘__main__‘: 21 con1 = consumer(‘c1‘) 22 con2 = consumer(‘c2‘) 23 p = producer()
执行结果
---> starting eating baozi... ---> starting eating baozi... [producer] is making baozi 1 [c1] is eating baozi 1 [c2] is eating baozi 1 [producer] is making baozi 2 [c1] is eating baozi 2 [c2] is eating baozi 2 [producer] is making baozi 3 [c1] is eating baozi 3 [c2] is eating baozi 3 [producer] is making baozi 4 [c1] is eating baozi 4 [c2] is eating baozi 4 [producer] is making baozi 5 [c1] is eating baozi 5 [c2] is eating baozi 5
2、使用gevent实现协程
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import gevent 5 import time 6 7 def func1(): 8 print("\033[32;1m func1函数的第1部分... \033[0m") 9 gevent.sleep(2) 10 # time.sleep(2) #用time.sleep(2)是不行的 11 print("\033[32;1m func1函数的第2部分... \033[0m") 12 13 def func2(): 14 print("\033[31;1m func2函数的第1部分... \033[0m") 15 gevent.sleep(1) 16 # time.sleep(1) 17 print("\033[31;1m func2函数的第2部分... \033[0m") 18 19 def func3(): 20 print("\033[34;1m func3函数的第1部分... \033[0m") 21 gevent.sleep(3) 22 # time.sleep(3) 23 print("\033[34;1m func3函数的第2部分... \033[0m") 24 25 26 if __name__ == ‘__main__‘: 27 gevent.joinall([ 28 gevent.spawn(func1), 29 gevent.spawn(func2), 30 gevent.spawn(func3), 31 ])
执行结果
func1函数的第1部分... func2函数的第1部分... func3函数的第1部分... func2函数的第2部分... func1函数的第2部分... func3函数的第2部分...
3、协程结合urllib模块爬网站
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 from gevent import monkey 5 monkey.patch_all() #遇到阻塞就切换全靠它(作用:把要用到的接口全部变为非阻塞模式) 6 7 import gevent 8 from urllib.request import urlopen 9 10 def f(url): 11 print("GET: %s " %url) 12 resp = urlopen(url) 13 data = resp.read() 14 print("%s bytes received from %s " %(len(data),url)) 15 16 if __name__ == ‘__main__‘: 17 gevent.joinall([ 18 gevent.spawn(f, ‘http://www.cdu.edu.cn/‘), 19 gevent.spawn(f,‘https://www.python.org/‘), 20 gevent.spawn(f,‘https://www.jd.com/‘), 21 gevent.spawn(f,‘https://www.vip.com/‘) 22 ])
执行结果
GET: http://www.cdu.edu.cn/ GET: https://www.python.org/ GET: https://www.jd.com/ GET: https://www.vip.com/ 137584 bytes received from https://www.jd.com/ 69500 bytes received from https://www.vip.com/ 47984 bytes received from http://www.cdu.edu.cn/ 47695 bytes received from https://www.python.org/ Process finished with exit code 0
4、利用协程实现高并发服务器
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import gevent 5 import socket 6 from gevent import monkey; monkey.patch_all() 7 8 def server(port): 9 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 10 s.bind((‘0.0.0.0‘,port)) 11 s.listen(5000) 12 13 while True: 14 # print("\033[32;1m server is waiting... \033[0m") 15 print(" server is waiting... ") 16 conn, addr = s.accept() 17 gevent.spawn(handle_request,conn) 18 19 def handle_request(conn): 20 try: 21 while True: 22 data = conn.recv(1024) 23 print("recvied:",data.decode(‘utf8‘)) 24 conn.send(data) 25 if not data: 26 conn.shutdown(socket.SHUT_WR) 27 28 except Exception as ex: 29 print(ex) 30 31 finally: 32 conn.close() 33 34 35 if __name__ == ‘__main__‘: 36 server(9999)
server
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import socket 5 import threading 6 7 def run(n): 8 ‘‘‘这里是启动多线程,然后每个线程死循环发包给服务器,测试协程服务器的高并发和稳定性‘‘‘ 9 while True: 10 # msg = input(">>:").strip() 11 # if len(msg) == 0:continue 12 # if msg == ‘q‘:break 13 msg = ‘hello %s‘ %n 14 sk.send(bytes(msg,‘utf8‘)) 15 data = sk.recv(1024) 16 print("Received:",data.decode(‘utf8‘)) 17 18 sk.close() 19 20 if __name__ == ‘__main__‘: 21 IP_Port = (‘127.0.0.1‘, 9999) 22 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 23 sk.connect(IP_Port) 24 25 thread_list = [] 26 for i in range(2000): 27 t = threading.Thread(target=run,args=[i,]) 28 t.start() 29 thread_list.append(t) 30 31 for thread in thread_list: 32 thread.join()
client