# 队列
# 队列 先进先出# IPC# from multiprocessing import Queue# q = Queue(5)# q.put(1)# q.put(2)# q.put(3)# q.put(4)# q.put(5)# print(q.full()) # 队列是否满了,已满话再次放入会阻塞# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.empty()) # 队列是否空了,空了再去会阻塞# while True: #不阻塞处理# try:# q.get_nowait()# except:# print(‘队列已空‘)# time.sleep(0.5)# for i in range(6):# q.put(i) from multiprocessing import Queue,Processdef produce(q): q.put(‘hello‘) def consume(q): print(q.get()) if __name__ == ‘__main__‘: #在win下才需要这段代码 q = Queue() p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume, args=(q,)) c.start()
# 生产者消费者模型
# 队列# 生产者消费者模型 # 生产者 进程# 消费者 进程import timeimport randomfrom multiprocessing import Process,Queuedef consumer(q,name): while True: food = q.get() if food is None: #用q.empty()) 不可靠,也许在上报空后,另外又有生产者放入东西 print(‘%s获取到了一个空‘%name) break print(‘\033[31m%s消费了%s\033[0m‘ % (name,food)) time.sleep(random.randint(1,3)) def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = ‘%s生产了%s%s‘%(name,food,i) print(f) q.put(f) if __name__ == ‘__main__‘: q = Queue(20) p1 = Process(target=producer,args=(‘Egon‘,‘包子‘,q)) p2 = Process(target=producer, args=(‘wusir‘,‘泔水‘, q)) c1 = Process(target=consumer, args=(q,‘alex‘)) c2 = Process(target=consumer, args=(q,‘jinboss‘)) p1.start() 异步 p2.start() c1.start() c2.start() 异步 p1.join() # 这里非异步转同步,而是判断生产者是否结束 p2.join() q.put(None) q.put(None) 当取到None时为什么有阻塞情况,未显示程序执行完,因为两个人其中一个人拿到None, 另一个人就取不到值出现等待情况,有几个人就put几个None就解决了
# 生产者消费者模型_joinableQueue(解决一个None,多人get阻塞问题)
import timeimport randomfrom multiprocessing import Process,JoinableQueuedef consumer(q,name): while True: food = q.get() print(‘\033[31m%s消费了%s\033[0m‘ % (name,food)) time.sleep(random.randint(1,3)) q.task_done() # count - 1 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = ‘%s生产了%s%s‘%(name,food,i) print(f) q.put(f) q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕 if __name__ == ‘__main__‘: q = JoinableQueue(20) p1 = Process(target=producer,args=(‘Egon‘,‘包子‘,q)) p2 = Process(target=producer, args=(‘wusir‘,‘泔水‘, q)) c1 = Process(target=consumer, args=(q,‘alex‘)) c2 = Process(target=consumer, args=(q,‘jinboss‘)) p1.start() p2.start() c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束 c2.daemon = True c1.start() c2.start() p1.join() p2.join() # 感知一个进程的结束 # 在消费者这一端: # 每次获取一个数据 # 处理一个数据 # 发送一个记号 : 标志一个数据被处理成功 # 在生产者这一端: # 每一次生产一个数据, # 且每一次生产的数据都放在队列中 # 在队列中刻上一个记号 # 当生产者全部生产完毕之后, # join信号 : 已经停止生产数据了 # 且要等待之前被刻上的记号都被消费完 # 当数据都被处理完时,join阻塞结束 # consumer 中把所有的任务消耗完# producer 端 的 join感知到,停止阻塞# 所有的producer进程结束# 主进程中的p.join结束# 主进程中代码结束# 守护进程(消费者的进程)结束
原文地址:https://www.cnblogs.com/mys6/p/10848835.html
时间: 2024-08-01 11:57:48