并发编程 之 生产者消费者模型

1 什么是生产者消费者模型    生产者:比喻的是程序中负责产生数据的任务    消费者:比喻的是程序中负责处理数据的任务

生产者->共享的介质(队列)<-消费者

2 为何用    实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费    从而平衡了生产者的生产能力与消费者消费能力,提升了程序整体运行的效率

什么时候用?        当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据        此时就应该考虑使用生产者消费者模型来提升程序的效率
from multiprocessing import JoinableQueue,Processimport timeimport osimport random

def producer(name,food,q):    for i in range(3):        res=‘%s%s‘ %(food,i)        time.sleep(random.randint(1,3))        # 往队列里丢        q.put(res)        print(‘\033[45m%s 生产了 %s\033[0m‘ %(name,res))    # q.put(None)

def consumer(name,q):    while True:        #从队列里取走        res=q.get()        if res is None:break        time.sleep(random.randint(1,3))        print(‘\033[46m%s 吃了 %s\033[0m‘ %(name,res))        q.task_done()

if __name__ == ‘__main__‘:    q=JoinableQueue()    # 生产者们    p1=Process(target=producer,args=(‘egon‘,‘包子‘,q,))    p2=Process(target=producer,args=(‘杨军‘,‘泔水‘,q,))    p3=Process(target=producer,args=(‘猴老师‘,‘翔‘,q,))    # 消费者们    c1=Process(target=consumer,args=(‘Alex‘,q,))    c2=Process(target=consumer,args=(‘wupeiqidsb‘,q,))    c1.daemon=True    c2.daemon=True

p1.start()    p2.start()    p3.start()    c1.start()    c2.start()

p1.join()    p2.join()    p3.join()

q.join() #等待队列被取干净    # q.join() 结束意味着    # 主进程的代码运行完毕--->(生产者运行完毕)+队列中的数据也被取干净了->消费者没有存在的意义

# print(‘主‘)

原文地址:https://www.cnblogs.com/fxc-520520/p/9301782.html

时间: 2024-10-03 19:14:00

并发编程 之 生产者消费者模型的相关文章

5 并发编程--队列&amp;生产者消费者模型

1.队列的介绍 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: maxsize是队列中允许最大项数,省略则无大小限制. 但需要明确: 1.队列内存放的是消息而非大数据 2.队列占用的是内存空间,因而maxsize即便

并发通信、生产者消费者模型

多进程之间通信的限制 看一个例子: import multiprocessing as mp data=666 def func(): global data data=222 p=mp.Process(target=func) p.start() p.join() print(data) >>>666 可以看到,声明为global的data也没有发生变化,输出结果仍然是666,这正是多进程之间通信的限制,各个进程之间是相互独立的,互不干扰的内存空间.因此如果想要空想数据就必须开辟一段共

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也.-<论语> PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~ Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue.Queue 用来临时保存一组等待处理的元素.BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作. BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型. 这里写图

Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型

一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例子 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

Java线程:并发协作-生产者消费者模型

对于多线程程序来说,不管任何编程语言,生产者消费者模型都是最经典的. 实际上,准确的说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确以下几点: 生产者仅仅在仓储未满时候生产,仓满则停止生产. 消费者仅仅在仓储有产品时候才能消费,仓空则等待. 当消费者发现仓储没有产品的时候会通知生产者生产. 生产者在生产出可消费产品时候,应该通知消费者去消费. 此模型将要结合java.lang.Object的wait与notify,notify

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

进击的Python【第九章】:paramiko模块、线程与进程、各种线程锁、queue队列、生产者消费者模型

一.paramiko模块 他是什么东西? paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接. 先来个实例: 1 import paramiko 2 # 创建SSH对象 3 ssh = paramiko.SSHClient() 4 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ss