队列、生产者消费者模型

目录

  • 队列、生产者消费者模型、初识线程

    • 一、用进程锁来优化抢票小程序

      • 1.1 进程锁
      • 1.2 优化抢票小程序
    • 二、队列
      • 2.1 队列的介绍
      • 2.2 创建队列的类
      • 2.3 使用队列的案例
    • 三、生产者消费者模型
      • 3.1 用队列Queue实现生产者消费者模型
      • 3.2 用队列JoinableQueue实现生产者消费者模型

队列、生产者消费者模型、初识线程

一、用进程锁来优化抢票小程序

1.1 进程锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没有问题的。而共享带来的是竞争,竞争带来的结果就是错乱,那就需要加锁处理来控制。

多个进程共享同一打印终端的时候,并发运行的话虽然效率高,但是竞争同一打印终端,带来了打印错乱;而由并发变成串行,虽然牺牲了运行效率,但是避免了竞争。

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度是慢了,但是能够保证数据安全。

进程锁的使用需要from multiprocessing import Process ,Lock

1.2 优化抢票小程序



from multiprocessing import Process,Lock
import json,time,os

def search():
    time.sleep(1)  # 模拟网络io
    with open('db.txt',mode='rt',encoding='utf-8') as fr:
        res = json.load(fr)
        print(f"还剩{res['count']}张票")

def get():
    with open('db.txt',mode='rt',encoding='utf-8') as fr:
        res = json.load(fr)
    time.sleep(1)
    if res['count'] > 0:
        res['count'] -= 1
        with open('db.txt',mode='wt',encoding='utf-8') as fw:
            json.dump(res,fw)
            print(f"进程{os.getpid()}抢票成功")
        time.sleep(1.5) # 模拟网络io
    else:
        print('票已售罄!!!')

def task(lock):
    search()
    lock.acquire()  # 用进程锁锁住
    get()
    lock.release()  # 释放锁头

if __name__ == '__main__':
    lock = Lock() # 写在主进程是为了让子进程拿到同一把锁
    for i in range(15):  # 同时有15个人抢票
        p = Process(target=task,args=(lock,))
        p.start()

    # 进程锁和join用法的区别
    # 进程锁:是把锁住的代码变成了串行
    # join:是把所有的子进程变成了串行

# 为了保证数据的安全,只能串行,牺牲掉效率

二、队列

2.1 队列的介绍

对于上面的进程锁的例子,我们可以寻找一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。

这就是multiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道

ipc机制:进程通讯

管道:pipe 基于共享的内存空间

队列:Queue = pipe+锁

队列和管道都是将数据存放于内存中的,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

2.2 创建队列的类

底层就是以管道和锁定的方式实现

Queue([maxsize]):  创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。    

==要注意的是:Queue不适合传大文件,通常传一些信息==

类中方法介绍:**

  • 主要方法:

    1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
    3
    4 q.get_nowait():同q.get(False)
    5 q.put_nowait():同q.put(False)
    6
    7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
  • 其他方法(了解)
    1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
    2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
    '''

2.3 使用队列的案例

案例一

from multiprocessing import Process,Queue

q = Queue()
q.put('鸭屁屁')  # 插入数据到队列中
q.put([1,2,4])
q.put(2)
print(q.get())
print(q.get())
print(q.get())
#q.put(5)  # 往队列中再放入一个值,下面的代码就不会阻塞
print(q.get())  # 前三个已经把值拿掉了,这里就会默认一直等着拿值,就阻塞住了

案例二

from multiprocessing import Process,Queue
q = Queue(4) # 括号里的参数是指定队列里值得最大个数
q.put('鸭屁屁')
q.put([1,2,3])
q.put([2,3,4])
q.put(5)
q.put(6)  # 队列满了的情况再放值,会阻塞

案例三(以下几个案例了解即可)

from multiprocessing import Process,Queue
q = Queue(3)
q.put('zhao',block=True,timeout=2) # block=True:默认会阻塞,timeout:指定阻塞的时间
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=2)

q.put('zhao',block=True,timeout=5) # put里的 block=True 如果满了会等待,timeout最多等待n秒,如果n秒队列还是满的就报错了

案例四

from multiprocessing import Process,Queue
q = Queue()
q.put('yyyy')
q.get()
q.get(block=True,timeout=5) #get里的 block=True 阻塞等待,timeout最多等5s, 5s后还是取不到就报错

案例五

from multiprocessing import Process,Queue
q = Queue(3)
q.put('qwe')
q.put('qwe')
q.put('qwe')

q.put('qwe',block=False) # 对于put来说 block=False 如果队列满了就直接报错

q = Queue(3)
q.put('qwe')
q.get()

q.get(block=False)  # 对于get来说 block = Flase 拿不到不阻塞,直接报错

案例六

from multiprocessing import Process,Queue
q = Queue(1)
q.put('123')
# q.get()
q.put_nowait('666')  # put_nowait方法相当于put里的block = Flase
# q.get_nowait()   # get_nowait方法相当于get里的block = Flase

三、生产者消费者模型

3.1 用队列Queue实现生产者消费者模型

生产者: 生产数据的任务

消费者: 处理数据的任务

生产者--队列(盆)-->消费者

生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.

生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.

补充: queue不适合传大文件,通产传一些消息.

from multiprocessing import Process,Queue
import time,random

def producer(q,name,food):
    '''生产者进程'''
    for i in range(3):
        print(f"{name}生产了{food}{i}")
        time.sleep(random.randint(1,3))
        res = f"{food}{i}"
        q.put(res)

def consumer(q,name):
    '''消费者进程'''
    while True:
        res = q.get(timeout=5)
        if res is None:
            break  # 收到空信号就结束
        time.sleep(random.randint(1,3))
        print(f"{name}吃了{res}")

    if __name__ == '__main__':
        q = Queue()
        # 生产者对象
        p1 = Process(target=producer,args=(q,'rocky','生蚝'))
        p2 = Process(target=producer,args=(q,'nick','韭菜'))
        p3 = Process(target=producer,args=(q,'tank','扇贝'))
        # 消费者对象
        c1 = Process(target=consumer,args=(q,'蓬妹'))
        c2 = Process(target=consumer,args=(q,'山鸡'))
        # 生产者开始生产,消费者开始吃
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        # 必须保证生产者生产完才能发送结束的信号,用到Process的join方法
        p1.join()  # 感知子进程的结束
        p2.join()
        p3.join()  # 到这里生产者生产完毕
        q.put(None)  # 有几个消费者就put几次
        q.put(None)

3.2 用队列JoinableQueue实现生产者消费者模型

JoinableQueue:这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(name,food,q):
    '''生产者进程'''
    for i in range(2):
        time.sleep(random.randint(1,3))
        res = f"{food}{i}"
        q.put(res)
        print(f'\033[44m{name} 生产了 {res}\033[0m')
    q.join()  #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。

def consumer(name,q):
    '''消费者进程'''
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print(f"{name}吃了{res}")
        q.task_done()  #向q.join()发送一次信号,证明一个数据已经被取走了

if __name__ == '__main__':
    q = JoinableQueue()
    #生产者
    p1 = Process(target=producer,args=('yjy','酱大骨',q))
    p2 = Process(target=producer,args=('wwb','酸菜鱼',q))
    p3 = Process(target=producer,args=('hhh','卤猪蹄',q))

    #消费者
    c1 = Process(target=consumer,args = ('xx',q,))
    c2 = Process(target=consumer,args = ('yy',q,))
    c1.daemon = True  #将他设置为守护进程
    c2.daemon = True  #将他设置为守护进程

    #开始生产,开始吃
    l = [p1,p2,p3,c1,c2]
    for i in l :
        i.start()

    #必须保证生产者生产完才能发送结束的信号,运用到.join
    p1.join()  #感知子进程的结束
    p2.join()
    p3.join()
    print("主进程结束了")

'''
主进程等p1,p2,p3
P1,P2,p3等c,c2
p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
#因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
'''

原文地址:https://www.cnblogs.com/zhuangyl23/p/11529792.html

时间: 2024-10-08 20:33:39

队列、生产者消费者模型的相关文章

#queue队列 #生产者消费者模型

1 #queue队列 #生产者消费者模型 2 3 #queue队列 #有顺序的容器 4 #程序解耦 5 #提高运行效率 6 7 #class queue.Queue(maxsize=0) #先入先出 8 #class queue.LifoQueue(maxsize=0)最后在第一 9 #class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列#VIP客户 10 11 #Queue.qsize() 12 #Queue.empty() #return

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

python2.0_s12_day9之day8遗留知识(queue队列&生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ

5 并发编程--队列&生产者消费者模型

1.队列的介绍 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: maxsize是队列中允许最大项数,省略则无大小限制. 但需要明确: 1.队列内存放的是消息而非大数据 2.队列占用的是内存空间,因而maxsize即便

Python学习笔记——进阶篇【第九周】———线程、进程、协程篇(队列Queue和生产者消费者模型)

Python之路,进程.线程.协程篇 本节内容 进程.与线程区别 cpu运行原理 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 参考链接http://www.cnblogs.com/alex3714/articles/5230609.html

生产者消费者模型实现多线程异步交互

[Python之旅]第六篇(五):生产者消费者模型实现多线程异步交互 消息队列 生产者消费者模型 多线程异步交互 摘要:  虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用... 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应

进程间通信IPC---队列、生产者消费者模型、生产者消费者模型_joinableQueue(五)

#  队列 # 队列 先进先出# IPC# from multiprocessing import Queue# q = Queue(5)# q.put(1)# q.put(2)# q.put(3)# q.put(4)# q.put(5)# print(q.full()) # 队列是否满了,已满话再次放入会阻塞# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.empt

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个

进击的Python【第九章】:paramiko模块、线程与进程、各种线程锁、queue队列、生产者消费者模型

一.paramiko模块 他是什么东西? paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接. 先来个实例: 1 import paramiko 2 # 创建SSH对象 3 ssh = paramiko.SSHClient() 4 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ss

使用阻塞队列实现生产者-消费者模型

生产者-消费者模问题 /** * 使用阻塞队列实现生产者-消费者模型 * 阻塞队列只允许元素以FIFO的方式来访问 * @author Bingyue * */ public class ProducerCustomerPattern { public static void main(String[] args) { //生产者和消费者共享的存储区域 BlockingQueue<Integer> blockQueue=new LinkedBlockingQueue(); /** * 此处外部