目录:
- 并发多线程
- 协程
- I/O多路复用(未完成,待续)
一、并发多线程
1、线程简述:
一条流水线的执行过程是一个线程,一条流水线必须属于一个车间,一个车间的运行过程就是一个进程(一个进程内至少一个线程)
进程是资源单位
而线程才是cpu上的执行单位
2、线程的优点:共享资源、创建开销小
3、线程的模块开启方式之threading模块
multiprocess模块的完全模仿了threading模块的接口。执行如下图:
方式一: 函数式调用
1 from threading import Thread 2 import time 3 def sayhi(name): # 定义函数 4 time.sleep(2) 5 print(‘%s say hello‘ %name) 6 7 if __name__ == ‘__main__‘: 8 t=Thread(target=sayhi,args=(‘wangshuyang‘,)) #定义线程类并传参 9 t.start() # 调用 10 print(‘主线程‘)
方式二:类式调用
1 #方式二 2 from threading import Thread 3 import time 4 class Sayhi(Thread): # 引用类 5 def __init__(self,name): 6 super().__init__() 7 self.name=name 8 def run(self): 9 time.sleep(2) 10 print(‘%s say hello‘ % self.name) 11 12 13 if __name__ == ‘__main__‘: 14 t = Sayhi(‘wangshuyang‘) #定义类 15 t.start() #调用 16 print(‘主线程‘)
4、多进程与多线程对比:
多线程比多进程更快,开销更小
1 from threading import Thread 2 from multiprocessing import Process 3 import os 4 5 def work(): 6 print(‘hello‘) 7 8 if __name__ == ‘__main__‘: 9 #在主进程下开启线程 10 t=Thread(target=work) 11 t.start() 12 print(‘主线程/主进程‘) 13 ‘‘‘ 14 打印结果: 15 hello 16 主线程/主进程 17 ‘‘‘ 18 19 #在主进程下开启子进程 20 t=Process(target=work) 21 t.start() 22 print(‘主线程/主进程‘) 23 ‘‘‘ 24 打印结果: 25 主线程/主进程 26 hello 27 ‘‘‘
pid进程号对比
1 from threading import Thread 2 from multiprocessing import Process 3 import os 4 5 def work(): 6 print(‘hello‘,os.getpid()) 7 8 if __name__ == ‘__main__‘: 9 #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 10 t1=Thread(target=work) 11 t2=Thread(target=work) 12 t1.start() 13 t2.start() 14 print(‘主线程/主进程pid‘,os.getpid()) 15 16 #part2:开多个进程,每个进程都有不同的pid 17 p1=Process(target=work) 18 p2=Process(target=work) 19 p1.start() 20 p2.start() 21 print(‘主线程/主进程pid‘,os.getpid())
5、多线程并发socket示例
1 #_*_coding:utf-8_*_ 2 #!/usr/bin/env python 3 import multiprocessing 4 import threading 5 6 import socket 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 8 s.bind((‘127.0.0.1‘,8080)) 9 s.listen(5) 10 11 def action(conn): 12 while True: 13 data=conn.recv(1024) 14 print(data) 15 conn.send(data.upper()) 16 17 if __name__ == ‘__main__‘: 18 19 while True: 20 conn,addr=s.accept() 21 22 23 p=threading.Thread(target=action,args=(conn,)) 24 p.start() 25 26 服务端
服务端
1 #_*_coding:utf-8_*_ 2 #!/usr/bin/env python 3 4 5 import socket 6 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 8 s.connect((‘127.0.0.1‘,8080)) 9 10 while True: 11 msg=input(‘>>: ‘).strip() 12 if not msg:continue 13 14 s.send(msg.encode(‘utf-8‘)) 15 data=s.recv(1024) 16 print(data) 17 18 客户端
客户端
6、多线程模拟文件操作示例
1 from threading import Thread 2 msg_l=[] 3 format_l=[] 4 def talk(): 5 while True: 6 msg=input(‘>>: ‘).strip() 7 if not msg:continue 8 msg_l.append(msg) 9 10 def format_msg(): 11 while True: 12 if msg_l: 13 res=msg_l.pop() 14 format_l.append(res.upper()) 15 16 def save(): 17 while True: 18 if format_l: 19 with open(‘db.txt‘,‘a‘,encoding=‘utf-8‘) as f: 20 res=format_l.pop() 21 f.write(‘%s\n‘ %res) 22 23 if __name__ == ‘__main__‘: 24 t1=Thread(target=talk) 25 t2=Thread(target=format_msg) 26 t3=Thread(target=save) 27 t1.start() 28 t2.start() 29 t3.start()
7、threading模块之调用方法
1)join与setdaemon
与进程的方法都是类似的,其实是multiprocessing模仿threading的接口
join 等待线程执行完成,执行主进程
setdaemon 守护线程,主进程关闭,线程关闭
1 from threading import Thread 2 import time 3 def sayhi(name): 4 time.sleep(2) 5 print(‘%s say hello‘ %name) 6 7 if __name__ == ‘__main__‘: 8 t=Thread(target=sayhi,args=(‘egon‘,)) 9 t.setDaemon(True) 10 t.start() 11 t.join() 12 print(‘主线程‘) 13 print(t.is_alive())
2)Thread实例对象的方法
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
3)其他方法
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
1 from threading import Thread 2 import threading 3 from multiprocessing import Process 4 import os 5 6 def work(): 7 import time 8 time.sleep(3) 9 print(threading.current_thread().getName()) 10 11 12 if __name__ == ‘__main__‘: 13 #在主进程下开启线程 14 t=Thread(target=work) 15 t.start() 16 17 print(threading.current_thread().getName()) 18 print(threading.current_thread()) #主线程 19 print(threading.enumerate()) #连同主线程在内有两个运行的线程 20 print(threading.active_count()) 21 print(‘主线程/主进程‘) 22 23 ‘‘‘ 24 打印结果: 25 MainThread 26 <_MainThread(MainThread, started 140735268892672)> 27 [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 28 2 29 主线程/主进程 30 Thread-1 31 ‘‘‘
8、Python GIL(Global Interpreter Lock)全局解释器锁
在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。只有cpython中一定会用GIL,Python完全可以不依赖于GIL。GIL锁原理
ps.额外应该知道的知识
对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用,因为对应的硬盘,在操作系统来看,只有一块。
现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
9、python 多线程和多进程的应用场景
计算密集型
1 #计算密集型 2 from threading import Thread 3 from multiprocessing import Process 4 import os 5 import time 6 def work(): 7 res=0 8 for i in range(1000000): 9 res+=i 10 11 if __name__ == ‘__main__‘: 12 t_l=[] 13 start_time=time.time() 14 # for i in range(300): #串行 15 # work() 16 17 for i in range(300): 18 t=Thread(target=work) #在我的机器上,4核cpu,多线程大概15秒 19 # t=Process(target=work) #在我的机器上,4核cpu,多进程大概10秒 20 t_l.append(t) 21 t.start() 22 23 for i in t_l: 24 i.join() 25 26 stop_time=time.time() 27 print(‘run time is %s‘ %(stop_time-start_time)) 28 29 print(‘主线程‘)
I/O密集型
1 #I/O密集型 2 from threading import Thread 3 from multiprocessing import Process 4 import time 5 import os 6 def work(): 7 time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果 8 print(os.getpid()) 9 10 if __name__ == ‘__main__‘: 11 t_l=[] 12 start_time=time.time() 13 for i in range(1000): 14 t=Thread(target=work) #耗时大概为2秒 15 # t=Process(target=work) #耗时大概为25秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用 16 t_l.append(t) 17 t.start() 18 19 for t in t_l: 20 t.join() 21 stop_time=time.time() 22 print(‘run time is %s‘ %(stop_time-start_time))
多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析
10、同步锁
多线程或多进程程序访问同一份资源操作时,需要加锁。如下:
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每个线程中都获取这个全局变量 6 #num-=1 7 8 temp=num 9 time.sleep(0.1) 10 num =temp-1 # 对此公共变量进行-1操作 11 12 num = 100 #设定一个共享变量 13 14 thread_list = [] 15 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: #等待所有线程执行完毕 22 t.join() 23 24 print(‘Result: ‘, num)
锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:
1 import threading 2 3 R=threading.Lock() 4 5 R.acquire() 6 ‘‘‘ 7 对公共数据的操作 8 ‘‘‘ 9 R.release()
GIL VS Lock
机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?
首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据,然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock
详细的:
因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake
up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和
py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,
这可以说是Python早期版本的遗留问题。
11、死锁与递归锁
进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
1 from threading import Thread,Lock 2 import time 3 mutexA=Lock() 4 mutexB=Lock() 5 6 class MyThread(Thread): 7 def run(self): 8 self.func1() 9 self.func2() 10 def func1(self): 11 mutexA.acquire() 12 print(‘\033[41m%s 拿到A锁\033[0m‘ %self.name) 13 14 mutexB.acquire() 15 print(‘\033[42m%s 拿到B锁\033[0m‘ %self.name) 16 mutexB.release() 17 18 mutexA.release() 19 20 def func2(self): 21 mutexB.acquire() 22 print(‘\033[43m%s 拿到B锁\033[0m‘ %self.name) 23 time.sleep(2) 24 25 mutexA.acquire() 26 print(‘\033[44m%s 拿到A锁\033[0m‘ %self.name) 27 mutexA.release() 28 29 mutexB.release() 30 31 if __name__ == ‘__main__‘: 32 for i in range(10): 33 t=MyThread() 34 t.start() 35 36 ‘‘‘ 37 Thread-1 拿到A锁 38 Thread-1 拿到B锁 39 Thread-1 拿到B锁 40 Thread-2 拿到A锁 41 然后就卡住,死锁了 42 ‘‘‘
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
12、信号量Semahpore
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
示例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
1 import threading 2 import time 3 4 semaphore = threading.Semaphore(5) 5 6 def func(): 7 if semaphore.acquire(): 8 print (threading.currentThread().getName() + ‘ get semaphore‘) 9 time.sleep(2) 10 semaphore.release() 11 12 for i in range(20): 13 t1 = threading.Thread(target=func) 14 t1.start()
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
13、事件Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
可以考虑一种应用场景(仅仅作为说明),例如,我们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。
1 import threading 2 import time 3 import logging 4 5 logging.basicConfig(level=logging.DEBUG, format=‘(%(threadName)-10s) %(message)s‘,) 6 7 def worker(event): 8 logging.debug(‘Waiting for redis ready...‘) 9 event.wait() 10 logging.debug(‘redis ready, and connect to redis server and do some work [%s]‘, time.ctime()) 11 time.sleep(1) 12 13 def main(): 14 readis_ready = threading.Event() 15 t1 = threading.Thread(target=worker, args=(readis_ready,), name=‘t1‘) 16 t1.start() 17 18 t2 = threading.Thread(target=worker, args=(readis_ready,), name=‘t2‘) 19 t2.start() 20 21 logging.debug(‘first of all, check redis server, make sure it is OK, and then trigger the redis ready event‘) 22 time.sleep(3) # simulate the check progress 23 readis_ready.set() 24 25 if __name__=="__main__": 26 main() 27 28 redis
redis
1 from threading import Thread,Event 2 import threading 3 import time,random 4 def conn_mysql(): 5 print(‘\033[42m%s 等待连接mysql。。。\033[0m‘ %threading.current_thread().getName()) 6 event.wait() 7 print(‘\033[42mMysql初始化成功,%s开始连接。。。\033[0m‘ %threading.current_thread().getName()) 8 9 10 def check_mysql(): 11 print(‘\033[41m正在检查mysql。。。\033[0m‘) 12 time.sleep(random.randint(1,3)) 13 event.set() 14 time.sleep(random.randint(1,3)) 15 16 if __name__ == ‘__main__‘: 17 event=Event() 18 t1=Thread(target=conn_mysql) #等待连接mysql 19 t2=Thread(target=conn_mysql) #等待连接myqsl 20 t3=Thread(target=check_mysql) #检查mysql 21 22 t1.start() 23 t2.start() 24 t3.start() 25 26 mysql
mysql
threading.Event的wait方法还接受一个超时参数,默认情况下如果事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。对应于上面的应用场景,如果Redis服务器一致没有启动,我们希望子线程能够打印一些日志来不断地提醒我们当前没有一个可以连接的Redis服务,我们就可以通过设置这个超时参数来达成这样的目的:
1 def conn_mysql(): 2 count=0 3 while not e.is_set(): 4 print(‘%s 第 <%s> 次尝试‘ %(threading.current_thread().getName(),count)) 5 count+=1 6 e.wait(0.5) 7 print(‘%s ready to conn mysql‘ %threading.current_thread().getName()) 8 time.sleep(1) 9 10 mysql
mysql
1 from threading import Thread,Event 2 import threading 3 import time,random 4 def conn_mysql(): 5 while not event.is_set(): 6 print(‘\033[42m%s 等待连接mysql。。。\033[0m‘ %threading.current_thread().getName()) 7 event.wait(0.1) 8 print(‘\033[42mMysql初始化成功,%s开始连接。。。\033[0m‘ %threading.current_thread().getName()) 9 10 11 def check_mysql(): 12 print(‘\033[41m正在检查mysql。。。\033[0m‘) 13 time.sleep(random.randint(1,3)) 14 event.set() 15 time.sleep(random.randint(1,3)) 16 17 if __name__ == ‘__main__‘: 18 event=Event() 19 t1=Thread(target=conn_mysql) 20 t2=Thread(target=conn_mysql) 21 t3=Thread(target=check_mysql) 22 23 t1.start() 24 t2.start() 25 t3.start()
这样,我们就可以在等待Redis服务启动的同时,看到工作线程里正在等待的情况。
应用:连接池
14、条件Condition
使得线程等待,只有满足某条件时,才释放n个线程
1 import threading 2 3 def run(n): 4 con.acquire() 5 con.wait() 6 print("run the thread: %s" %n) 7 con.release() 8 9 if __name__ == ‘__main__‘: 10 11 con = threading.Condition() 12 for i in range(10): 13 t = threading.Thread(target=run, args=(i,)) 14 t.start() 15 16 while True: 17 inp = input(‘>>>‘) 18 if inp == ‘q‘: 19 break 20 con.acquire() 21 con.notify(int(inp)) 22 con.release()
15、定时器
定时器,指定n秒后执行某操作
1 from threading import Timer 2 3 4 def hello(): 5 print("hello, world") 6 7 t = Timer(1, hello) 8 t.start() # after 1 seconds, "hello, world" will be printed
16、线程队列queue
queue队列 :使用import queue,用法与进程Queue一样
1 import queue 2 3 q=queue.Queue() 4 q.put(‘first‘) 5 q.put(‘second‘) 6 q.put(‘third‘) 7 8 print(q.get()) 9 print(q.get()) 10 print(q.get()) 11 ‘‘‘ 12 结果(先进先出): 13 first 14 second 15 third 16 ‘‘‘
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
1 import queue 2 3 q=queue.LifoQueue() 4 q.put(‘first‘) 5 q.put(‘second‘) 6 q.put(‘third‘) 7 8 print(q.get()) 9 print(q.get()) 10 print(q.get()) 11 ‘‘‘ 12 结果(后进先出): 13 third 14 second 15 first 16 ‘‘‘
class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
1 import queue 2 3 q=queue.PriorityQueue() 4 #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 5 q.put((20,‘a‘)) 6 q.put((10,‘b‘)) 7 q.put((30,‘c‘)) 8 9 print(q.get()) 10 print(q.get()) 11 print(q.get()) 12 ‘‘‘ 13 结果(数字越小优先级越高,优先级高的优先出队): 14 (10, ‘b‘) 15 (20, ‘a‘) 16 (30, ‘c‘) 17 ‘‘‘
生产者、消费者模型
1 #生产者消费者模型 2 import queue 3 import threading 4 import time 5 6 #创建队列 7 q = queue.Queue(50) 8 9 #定义消费者 10 def productor(arg): 11 ‘‘‘ 12 买票 13 :param arg: 14 :return: 15 ‘‘‘ 16 while True: 17 q.put(str(arg) + ‘号产生订单‘)#提交到队列 18 19 #创建300个线程发送请求 20 for i in range(300):#300个线程同时提交订单相当于300个人同时提交订单 21 t = threading.Thread(target= productor,args= (i,)) 22 t.start() 23 24 #定义生产者 25 def consumer(arg): 26 ‘‘‘ 27 服务器后台 28 :param arg: 29 :return: 30 ‘‘‘ 31 while True: 32 print(str(arg) + ‘处理了‘+q.get())#进程从队列中取订单进行处理 33 34 #3个线程同时工作 35 for j in range(3): 36 t = threading.Thread(target=consumer,args=(j,)) 37 t.start()
二、协程
1、协程:
是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
协程定义:
1.必须在只有一个单线程里实现并发
2.修改共享数据不需加锁
3.用户程序里自己保存多个控制流的上下文栈
4.附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
yield切换在没有io的情况下或者没有重复开辟内存空间的操作,对效率没有什么提升,甚至更慢,为此,可以用greenlet来为大家演示这种切换
2、python协程的优点:
python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其他线程运行)
单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换
优点:
协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
单线程内就可以实现并发的效果,最大限度地利用cpu
要实现协程,关键在于用户程序自己控制程序切换,切换之前必须由用户程序自己保存协程上一次调用时的状态,如此,每次重新调用时,能够从上次的位置继续执行。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈
ps.yiled复习:
1 import time 2 def consumer(item): 3 # print(‘拿到包子%s‘ %item) 4 x=11111111111 5 x1=12111111111 6 x3=13111111111 7 x4=14111111111 8 y=22222222222 9 z=33333333333 10 11 pass 12 def producer(target,seq): 13 for item in seq: 14 target(item) #每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大 15 16 start_time=time.time() 17 producer(consumer,range(100000000)) 18 stop_time=time.time() 19 print(‘run time is:%s‘ %(stop_time-start_time)) #30.132838010787964 20 21 22 #使用yield:无需重复开辟内存空间,即重复创建名称空间,因而开销小 23 import time 24 def init(func): 25 def wrapper(*args,**kwargs): 26 g=func(*args,**kwargs) 27 next(g) 28 return g 29 return wrapper 30 31 @init 32 def consumer(): 33 x=11111111111 34 x1=12111111111 35 x3=13111111111 36 x4=14111111111 37 y=22222222222 38 z=33333333333 39 while True: 40 item=yield 41 # print(‘拿到包子%s‘ %item) 42 pass 43 def producer(target,seq): 44 for item in seq: 45 target.send(item) #无需重新创建名称空间,从上一次暂停的位置继续,相比上例,开销小 46 47 start_time=time.time() 48 producer(consumer(),range(100000000)) 49 stop_time=time.time() 50 print(‘run time is:%s‘ %(stop_time-start_time)) #21.882073879241943
缺点:
协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
3、协程模块之Greenlet
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator
1 from greenlet import greenlet 2 3 def test1(): 4 print(‘test1,first‘) 5 gr2.switch() 6 print(‘test1,sencod‘) 7 gr2.switch() 8 def test2(): 9 print(‘test2,first‘) 10 gr1.switch() 11 print(‘test2,sencod‘) 12 13 14 gr1=greenlet(test1) 15 gr2=greenlet(test2) 16 gr1.switch()
可以在第一次switch时传入参数
1 #顺序执行 2 import time 3 def f1(): 4 res=0 5 for i in range(10000000): 6 res+=i 7 8 def f2(): 9 res=0 10 for i in range(10000000): 11 res*=i 12 13 14 start_time=time.time() 15 f1() 16 f2() 17 stop_time=time.time() 18 print(‘run time is: %s‘ %(stop_time-start_time)) #1.7395639419555664 19 20 21 #切换 22 from greenlet import greenlet 23 import time 24 def f1(): 25 res=0 26 for i in range(10000000): 27 res+=i 28 gr2.switch() 29 30 31 def f2(): 32 res=0 33 for i in range(10000000): 34 res*=i 35 gr1.switch() 36 37 gr1=greenlet(f1) 38 gr2=greenlet(f2) 39 40 start_time=time.time() 41 gr1.switch() 42 stop_time=time.time() 43 print(‘run time is: %s‘ %(stop_time-start_time)) #7.789067983627319
greenlet只是提供了一种比generator更加便捷的切换方式,仍然是没有解决遇到IO自动切换的问题
4、协程模块之第三方Gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
g1=gevent.spawn()创建一个协程对象g1,
spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的。
a.遇到IO阻塞时会自动切换任务
1 import gevent 2 import time 3 4 5 def eat(): 6 print(‘eat food 1‘) 7 gevent.sleep(2) #等饭来 8 print(‘eat food 2‘) 9 10 def play_phone(): 11 print(‘play phone 1‘) 12 gevent.sleep(1) #网卡了 13 print(‘play phone 2‘) 14 15 16 17 # gevent.spawn(eat) 18 # gevent.spawn(play_phone) 19 # print(‘主‘) # 直接结束 20 21 22 #因而也需要join方法,进程或现场的jion方法只能join一个,而gevent的join方法可以join多个 23 24 g1=gevent.spawn(eat) 25 g2=gevent.spawn(play_phone) 26 gevent.joinall([g1,g2]) 27 print(‘主‘)
b.同步与异步
1 import gevent 2 3 def task(pid): 4 """ 5 Some non-deterministic task 6 """ 7 gevent.sleep(0.5) 8 print(‘Task %s done‘ % pid) 9 10 def synchronous(): 11 for i in range(1,10): 12 task(i) 13 14 def asynchronous(): 15 threads = [gevent.spawn(task, i) for i in range(10)] 16 gevent.joinall(threads) 17 18 print(‘Synchronous:‘) 19 synchronous() 20 21 print(‘Asynchronous:‘) 22 asynchronous()
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前
或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
c.gevent线程的一些用法:
g1=gevent.spawn(func,1,,2,3,x=4,y=5)
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束
#或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
5、协程应用:
a. 爬虫
1 from gevent import monkey;monkey.patch_all() 2 import gevent 3 import requests 4 import time 5 6 def get_page(url): 7 print(‘GET: %s‘ %url) 8 response=requests.get(url) 9 if response.status_code == 200: 10 print(‘%d bytes received from %s‘ %(len(response.text),url)) 11 12 13 start_time=time.time() 14 gevent.joinall([ 15 gevent.spawn(get_page,‘https://www.python.org/‘), 16 gevent.spawn(get_page,‘https://www.yahoo.com/‘), 17 gevent.spawn(get_page,‘https://github.com/‘), 18 ]) 19 stop_time=time.time() 20 print(‘run time is %s‘ %(stop_time-start_time))
b.socket并发
1 from gevent import monkey;monkey.patch_all() 2 from socket import * 3 import gevent 4 5 #如果不想用money.patch_all()打补丁,可以用gevent自带的socket 6 # from gevent import socket 7 # s=socket.socket() 8 9 def server(server_ip,port): 10 s=socket(AF_INET,SOCK_STREAM) 11 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 12 s.bind((server_ip,port)) 13 s.listen(5) 14 while True: 15 conn,addr=s.accept() 16 gevent.spawn(talk,conn,addr) 17 18 def talk(conn,addr): 19 try: 20 while True: 21 res=conn.recv(1024) 22 print(‘client %s:%s msg: %s‘ %(addr[0],addr[1],res)) 23 conn.send(res.upper()) 24 except Exception as e: 25 print(e) 26 finally: 27 conn.close() 28 29 if __name__ == ‘__main__‘: 30 server(‘127.0.0.1‘,8080) 31 32 服务端
服务端
1 from socket import * 2 3 client=socket(AF_INET,SOCK_STREAM) 4 client.connect((‘127.0.0.1‘,8080)) 5 6 7 while True: 8 msg=input(‘>>: ‘).strip() 9 if not msg:continue 10 11 client.send(msg.encode(‘utf-8‘)) 12 msg=client.recv(1024) 13 print(msg.decode(‘utf-8‘)) 14 15 客户端
客户端
c.socket客户端并发
1 from threading import Thread 2 from socket import * 3 import threading 4 5 def client(server_ip,port): 6 c=socket(AF_INET,SOCK_STREAM) 7 c.connect((server_ip,port)) 8 9 count=0 10 while True: 11 c.send((‘%s say hello %s‘ %(threading.current_thread().getName(),count)).encode(‘utf-8‘)) 12 msg=c.recv(1024) 13 print(msg.decode(‘utf-8‘)) 14 count+=1 15 if __name__ == ‘__main__‘: 16 for i in range(500): 17 t=Thread(target=client,args=(‘127.0.0.1‘,8080)) 18 t.start()