一、生产者消费模型补充
总结:
---生产者消费者模型程序中两种角色:①负责生产数据(生产者);②负责处理数据(消费者)
---生产者消费者模型的作用:平衡生产者与消费者之间的速度差。
---实现方式:生产者——>队列——>消费者
如上篇博客内容关于生产消费模型内容,在生产者生产数据的过程结束后,即使消费者已将数据完全获取,消费者程序也不能结束,需由主进程或者生产者在结束生产程序后发送给消费者结束口令,消费者程序才会结束。但是如果出现多个消费者和多个生产者,这种情况又该如何解决?方法如下两种:
1、根据消费者数量传送结束信号(low)
from multiprocessing import Process,Queue import time,random,os def procducer(q): for i in range(10): res=‘包子%s‘ %i time.sleep(0.5) q.put(res) print(‘%s 生产了 %s‘ %(os.getpid(),res)) def consumer(q): while True: res=q.get() if res is None: break print(‘%s 吃 %s‘ %(os.getpid(),res)) time.sleep(random.randint(2,3)) if __name__ == ‘__main__‘: q=Queue() p=Process(target=procducer,args=(q,)) c=Process(target=consumer,args=(q,)) p.start() c.start() p.join() q.put(None) print(‘主‘)
from multiprocessing import Process,Queue import time import random import os def producer(name,q): for i in range(10): res=‘%s%s‘ %(name,i) time.sleep(random.randint(1, 3)) q.put(res) print(‘%s生产了%s‘ %(os.getpid(),res)) def consumer(name,q): while True: res=q.get() if not res:break print(‘%s吃了%s‘ %(name,res)) if __name__==‘__main__‘: q=Queue() p1=Process(target=producer,args=(‘巧克力‘,q)) p2=Process(target=producer,args=(‘甜甜圈‘,q)) p3=Process(target=producer, args=(‘奶油蛋糕‘,q)) c1=Process(target=consumer,args=(‘alex‘,q)) c2=Process(target=consumer,args=(‘egon‘,q)) _p=[p1,p2,p3,c1,c2] for p in _p: p.start() p1.join() p2.join() p3.join() ‘‘‘保证生产程序结束后,再发送结束信号,发送数量和消费者数量一致‘‘‘ q.put(None) q.put(None)
天啊噜
2、JoinableQueue队列机制
JoinableQueue与Queue队列基本相似,但前者队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。Queue实例的对象具有的方法JoinableQueue同样具有,除此JoinableQueue还具有如下方法:
①q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
②q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
from multiprocessing import Process,JoinableQueue import time import random def producer(name,food,q): for i in range(10): res=‘%s%s‘ %(food,i) time.sleep(random.randint(1, 3)) q.put(res) print(‘%s生产了%s‘ %(name,res)) q.join() #阻塞生产者进程,保证此进程结束时消费者进程已处理完其产生的数据 def consumer(name,q): while True: res=q.get() if not res:break print(‘%s吃了%s‘ %(name,res)) q.task_done() if __name__==‘__main__‘: q=JoinableQueue() p1=Process(target=producer,args=(1,‘巧克力‘,q)) p2=Process(target=producer,args=(2,‘奶油蛋糕‘,q)) p3 = Process(target=producer, args=(3,‘冰糖葫芦‘, q)) c1=Process(target=consumer,args=(‘lishi‘,q)) c2=Process(target=consumer,args=(‘jassin‘,q)) ‘‘‘守护进程保证主进程结束时,守护进程也立即结束‘‘‘ c1.daemon=True c2.daemon=True _p=[p1,p2,p3,c1,c2] for p in _p: p.start() p1.join() p2.join() p3.join()
二、回调函数
进程池执行完一个获得数据的进程,即刻要求通知主进程拿去解析数据。主进程调用一个函数去处理,这个函数便被称为回调函数,要求进程池进程的结果为回调函数的参数。
爬虫实例:
线程池
import requests from concurrent.futures import ThreadPoolExecutor(线程池),ProcessPoolExecutor(进程池) from threading import current_thread import time import os def get(url): # 下载 print(‘%s GET %s‘ %(current_thread().getName(),url)) response=requests.get(url) time.sleep(3) if response.status_code == 200: # 固定,=200表示下载完成 return {‘url‘:url,‘text‘:response.text} def parse(obj): # 解析 res=obj.result() print(‘[%s] <%s> (%s)‘ % (current_thread().getName(), res[‘url‘],len(res[‘text‘]))) if __name__ == ‘__main__‘: urls = [ ‘https://www.python.org‘, ‘https://www.baidu.com‘, ‘https://www.jd.com‘, ‘https://www.tmall.com‘, ] t=ThreadPoolExecutor(2) for url in urls: t.submit(get,url).add_done_callback(parse) t.shutdown(wait=True) print(‘主‘,os.getpid())
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。
进程池
import requests from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time import os def get(url): print(‘%s GET %s‘ %(os.getpid(),url)) response=requests.get(url) time.sleep(3) if response.status_code == 200: return {‘url‘:url,‘text‘:response.text} def parse(obj): res=obj.result() print(‘[%s] <%s> (%s)‘ % (os.getpid(), res[‘url‘],len(res[‘text‘]))) if __name__ == ‘__main__‘: urls = [ ‘https://www.python.org‘, ‘https://www.baidu.com‘, ‘https://www.jd.com‘, ‘https://www.tmall.com‘, ] t=ProcessPoolExecutor(2) for url in urls: t.submit(get,url).add_done_callback(parse) t.shutdown(wait=True) print(‘主‘,os.getpid())