和开源软件MQ 很相似
先进先出的逻辑,一个,N个噻, 1个,N个听Q
from multiprocessing import Process,Queue#队列,先进先出q=Queue(3) q.put({‘a‘:1})q.put(‘b‘)q.put(‘c‘)print(q.full()) -->是否 噻满q.put(‘d‘,False) #等同于q.put_nowait(‘d‘) 塞满后 再 噻,不 等待,直接报错q.put(‘d‘,timeout=2) #塞满后 再噻 等待2秒, 不能噻就报错print(q.qsize()) #查看q中 值的个数 print(q.get())print(q.get())print(q.get())print(q.empty()) -->是否取空print(q.get(block=False)) #等同于q.get_nowait(‘d‘) 听满后 再听,不 等待,直接报错print(q.get_nowait())print(q.get(timeout=2)) #听满后 再听 等待2秒, 不能听就报错
==========================模拟真实案例======================
吃包子, 厨师做,客户吃,相互 不干扰def GET(q,name,li): print(‘客户‘, name) for line in li: time.sleep(1) print(‘客户吃了‘,q.get()) def PUT(q,name,li): print(‘厨师‘,name) for line in li: time.sleep(1) print(‘%s生产出%s‘ %(name,line)) q.put(line) if __name__ == ‘__main__‘: q=Queue() li = [‘包子%s‘ % i for i in range(10)] p=Process(target=PUT,args=(q,‘la‘,li)) p.start() p1=Process(target=GET,args=(q,‘onda‘,li)) p1.start()
但是问题是,客户吃完后 不释放进程,还在卡住
===============================通过 put 提交一个 固定的值,让 get去 判断======================
def GET(q,name): while True: time.sleep(2) res=q.get() if res is None :break print(‘客户%s, 吃了%s‘ %(name,res)) def PUT(q,name,li): for line in li: time.sleep(1) q.put(line) print(‘厨师 %s 只做了%s‘ %(name,line)) q.put(None) if __name__ == ‘__main__‘: q=Queue() p1=Process(target=GET,args=(q,‘la‘)) p1.start() li=[‘包子%s‘ %i for i in range(10)] p2=Process(target=PUT,args=(q,‘onda‘,li)) p2.start() p1.join() p2.join() print(‘主进程‘)
==============================或者使用 JoinableQueue===========
from multiprocessing import Process,JoinableQueueimport timeimport random def consumer(q,name): while True: # time.sleep(random.randint(1,3)) res=q.get() q.task_done() print(‘\033[41m消费者%s拿到了%s\033[0m‘ %(name,res)) def producer(seq,q,name): for item in seq: # time.sleep(random.randint(1,3)) q.put(item) print(‘\033[42m生产者%s生产了%s\033[0m‘ %(name,item)) q.join() print(‘============>>‘) if __name__ == ‘__main__‘: q=JoinableQueue() c=Process(target=consumer,args=(q,‘egon‘),) c.daemon=True #设置守护进程,主进程结束c就结束 c.start() seq=[‘包子%s‘ %i for i in range(10)] p=Process(target=producer,args=(seq,q,‘厨师1‘)) p.start() # master--->producer----->q--->consumer(10次task_done) p.join() #主进程等待p结束,p等待c把数据都取完,c一旦取完数据,p.join就是不再阻塞,进 # 而主进程结束,主进程结束会回收守护进程c,而且c此时也没有存在的必要了 print(‘主进程‘)
get()中
q.task_done() 每吃完一个,都会 汇报一下,直到吃完,
put()中
q.join(),等待 get()吃完, put就可以 获取到,然后向下 进行,而此时 put() 还卡在 听q
get()
所以给 get()设置 守护进程, 在主进程释放后,将 get释放
主进程:
在让主进程通过 p.join() 等待 put()请求 结束 ,而put() 请求 会等get() 吃完, 最终 主进程才结束,然后 释放 get()
时间: 2025-01-08 15:06:05