5 并发编程--队列&生产者消费者模型

1、队列的介绍

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。
但需要明确:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

主要方法:

q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。
from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #满了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了

True
1
2
3
True

二、生产者消费者模型

1、生产者消费者模型介绍

为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,

如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,

才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者和消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的

2、生产者消费者模型实现

2.1 引入模型(生产一个消费一个)

import time
def producer():
    for i in range(3):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生产者生产了{res}")
        consumer(res)
def consumer(res):
    time.sleep(1)
    print(f"消费者吃了{res}")
if __name__ == ‘__main__‘:
    producer()

生产者生产了包子 0
消费者吃了包子 0
生产者生产了包子 1
消费者吃了包子 1
生产者生产了包子 2
消费者吃了包子 2

2.2 实现生产者消费者模型,但有小问题主进程永远不会结束

消费者不知道生产者已经完毕,一直处于等待状态,死循环

from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生产者生产了{res}")
        # 把生产的给队列保存
        q.put(res)

def consumer(q):
    while True:# 消费者一直接收
        res = q.get()
        time.sleep(1)
        print(f"消费者吃了{res}")
if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    print(‘主‘)

主
生产者生产了包子 0
生产者生产了包子 1
生产者生产了包子 2
消费者吃了包子 0
消费者吃了包子 1
消费者吃了包子 2

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

2.3、解决办法--其实我们的思路无非是发送结束信号而已

队列先进先出

from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生产者生产了{res}")
        # 把生产的给队列保存
        q.put(res)

def consumer(q):
    while True:# 消费者一直接收
        res = q.get()
        if res == None:
            break
        time.sleep(1)
        print(f"消费者吃了{res}")
if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    p1.join()# 主进程等待p1子进程执行完毕--即生产者生产完毕
    q.put(None)
    print(‘主‘)

生产者生产了包子 0
生产者生产了包子 1
生产者生产了包子 2
消费者吃了包子 0
主
消费者吃了包子 1
消费者吃了包子 2

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决,有几个消费者就需要发送几次结束信号:相当low,例如

if __name__ == ‘__main__‘:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,‘egon1‘,‘包子‘))
    p2=Process(target=producer,args=(q,‘egon2‘,‘骨头‘))
    p3=Process(target=producer,args=(q,‘egon3‘,‘泔水‘))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,‘alex1‘))
    c2=Process(target=consumer,args=(q,‘alex2‘))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print(‘主‘)

2.4 JoinableQueue([maxsize])



原文地址:https://www.cnblogs.com/foremostxl/p/9729652.html

时间: 2024-10-14 12:28:12

5 并发编程--队列&生产者消费者模型的相关文章

并发编程 之 生产者消费者模型

1 什么是生产者消费者模型 生产者:比喻的是程序中负责产生数据的任务 消费者:比喻的是程序中负责处理数据的任务 生产者->共享的介质(队列)<-消费者 2 为何用 实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费 从而平衡了生产者的生产能力与消费者消费能力,提升了程序整体运行的效率 什么时候用? 当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据 此时就应该考虑使用生产者消费者模型来提升程序的效率 from multiprocessing imp

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

#queue队列 #生产者消费者模型

1 #queue队列 #生产者消费者模型 2 3 #queue队列 #有顺序的容器 4 #程序解耦 5 #提高运行效率 6 7 #class queue.Queue(maxsize=0) #先入先出 8 #class queue.LifoQueue(maxsize=0)最后在第一 9 #class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列#VIP客户 10 11 #Queue.qsize() 12 #Queue.empty() #return

python2.0_s12_day9之day8遗留知识(queue队列&amp;生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ

并发通信、生产者消费者模型

多进程之间通信的限制 看一个例子: import multiprocessing as mp data=666 def func(): global data data=222 p=mp.Process(target=func) p.start() p.join() print(data) >>>666 可以看到,声明为global的data也没有发生变化,输出结果仍然是666,这正是多进程之间通信的限制,各个进程之间是相互独立的,互不干扰的内存空间.因此如果想要空想数据就必须开辟一段共

Python学习笔记——进阶篇【第九周】———线程、进程、协程篇(队列Queue和生产者消费者模型)

Python之路,进程.线程.协程篇 本节内容 进程.与线程区别 cpu运行原理 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 参考链接http://www.cnblogs.com/alex3714/articles/5230609.html

生产者消费者模型实现多线程异步交互

[Python之旅]第六篇(五):生产者消费者模型实现多线程异步交互 消息队列 生产者消费者模型 多线程异步交互 摘要:  虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用... 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应

进程间通信IPC---队列、生产者消费者模型、生产者消费者模型_joinableQueue(五)

#  队列 # 队列 先进先出# IPC# from multiprocessing import Queue# q = Queue(5)# q.put(1)# q.put(2)# q.put(3)# q.put(4)# q.put(5)# print(q.full()) # 队列是否满了,已满话再次放入会阻塞# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.empt

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也.-<论语> PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~ Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue.Queue 用来临时保存一组等待处理的元素.BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作. BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型. 这里写图