进程间通信IPC---队列、生产者消费者模型、生产者消费者模型_joinableQueue(五)

#  队列

# 队列 先进先出# IPC# from multiprocessing import Queue# q = Queue(5)# q.put(1)# q.put(2)# q.put(3)# q.put(4)# q.put(5)# print(q.full())   # 队列是否满了,已满话再次放入会阻塞# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.get())# print(q.empty())  # 队列是否空了,空了再去会阻塞# while True:  #不阻塞处理#     try:#         q.get_nowait()#     except:#         print(‘队列已空‘)#         time.sleep(0.5)# for i in range(6):#     q.put(i)

from multiprocessing import Queue,Processdef produce(q):    q.put(‘hello‘)

def consume(q):    print(q.get())

if __name__ == ‘__main__‘: #在win下才需要这段代码    q = Queue()    p = Process(target=produce,args=(q,))    p.start()    c = Process(target=consume, args=(q,))    c.start()

#  生产者消费者模型

# 队列# 生产者消费者模型

# 生产者 进程# 消费者 进程import timeimport randomfrom multiprocessing import Process,Queuedef consumer(q,name):    while True:        food = q.get()        if food is None:  #用q.empty()) 不可靠,也许在上报空后,另外又有生产者放入东西            print(‘%s获取到了一个空‘%name)            break        print(‘\033[31m%s消费了%s\033[0m‘ % (name,food))        time.sleep(random.randint(1,3))

def producer(name,food,q):    for i in range(4):        time.sleep(random.randint(1,3))        f = ‘%s生产了%s%s‘%(name,food,i)        print(f)        q.put(f)

if __name__  == ‘__main__‘:    q = Queue(20)    p1 = Process(target=producer,args=(‘Egon‘,‘包子‘,q))    p2 = Process(target=producer, args=(‘wusir‘,‘泔水‘, q))    c1 = Process(target=consumer, args=(q,‘alex‘))    c2 = Process(target=consumer, args=(q,‘jinboss‘))    p1.start()  异步    p2.start()    c1.start()    c2.start()  异步    p1.join()  # 这里非异步转同步,而是判断生产者是否结束    p2.join()    q.put(None)    q.put(None)

   当取到None时为什么有阻塞情况,未显示程序执行完,因为两个人其中一个人拿到None,   另一个人就取不到值出现等待情况,有几个人就put几个None就解决了

#  生产者消费者模型_joinableQueue(解决一个None,多人get阻塞问题)

import timeimport randomfrom multiprocessing import Process,JoinableQueuedef consumer(q,name):    while True:        food = q.get()        print(‘\033[31m%s消费了%s\033[0m‘ % (name,food))        time.sleep(random.randint(1,3))        q.task_done()     # count - 1

def producer(name,food,q):    for i in range(4):        time.sleep(random.randint(1,3))        f = ‘%s生产了%s%s‘%(name,food,i)        print(f)        q.put(f)    q.join()   # 阻塞  直到一个队列中的所有数据 全部被处理完毕

if __name__  == ‘__main__‘:    q = JoinableQueue(20)    p1 = Process(target=producer,args=(‘Egon‘,‘包子‘,q))    p2 = Process(target=producer, args=(‘wusir‘,‘泔水‘, q))    c1 = Process(target=consumer, args=(q,‘alex‘))    c2 = Process(target=consumer, args=(q,‘jinboss‘))    p1.start()    p2.start()    c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()    c2.start()    p1.join()    p2.join()      # 感知一个进程的结束

#  在消费者这一端:    # 每次获取一个数据    # 处理一个数据    # 发送一个记号 : 标志一个数据被处理成功

# 在生产者这一端:    # 每一次生产一个数据,    # 且每一次生产的数据都放在队列中    # 在队列中刻上一个记号    # 当生产者全部生产完毕之后,    # join信号 : 已经停止生产数据了                # 且要等待之前被刻上的记号都被消费完                # 当数据都被处理完时,join阻塞结束

# consumer 中把所有的任务消耗完# producer 端 的 join感知到,停止阻塞# 所有的producer进程结束# 主进程中的p.join结束# 主进程中代码结束# 守护进程(消费者的进程)结束

原文地址:https://www.cnblogs.com/mys6/p/10848835.html

时间: 2024-08-01 11:57:48

进程间通信IPC---队列、生产者消费者模型、生产者消费者模型_joinableQueue(五)的相关文章

守护进程,互斥锁,IPC,队列,生产者与消费者模型

小知识点:在子进程中不能使用input输入! 一.守护进程 守护进程表示一个进程b 守护另一个进程a 当被守护的进程结束后,那么守护进程b也跟着结束了 应用场景:之所以开子进程,是为了帮助主进程完成某个任务,然而,如果主进程认为自己的事情一旦做完了就没有必要使用子进程了,就可以将子进程设置为守护进程 例如:在运行qq的过程,开启一个进程,用于下载文件,然而文件还没有下载完毕,qq就退出了,下载任务也应该跟随qq的退出而结束. from multiprocessing import Process

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

守护进程 进程:一个正在运行的程序. 主进程创建守护进程: 1.守护进程会在主进程代码执行结束后就终止, 2.守护进程内无法再开启子进程,否则抛出异常. 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止. 例子:from multiprocessing import Processimport time def task(): print('老了....') time.sleep(2) print('睡了一会..') if __name__ == '__main__': prin

进程 >> 互斥锁、队列与管道、生产者消费者模型

目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重新实例化一把锁,我们的目的是想让所有的子进程使用同一把锁,所以需要把锁传递给子进程在使用 锁名.acquire():开锁->所有子进程开始抢位置 锁名.release():关锁->位置排好了,开始执锁起来执行. join与互斥锁的区别:join是把所有的子进程代码变为串行的,而互斥锁则可以规定那几

消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?

形象说明: 比喻: RabbitMQ提供的消息投递服务类似于现实生活中的快递公司,双11我们可能会买很多东西,自然会陆续收到很多寄自淘宝店主由快递公司发来的快件,但是可能很多时候买回来的东西并不合心意,自然会陆续通过快递公司退回快件,所以回归到架构,这里的快件就相当于消息,我们相当于应用程序,淘宝店主相当于服务器,而快递公司相当于路由器,应用程序可以发送和接收消息,服务器也可以发送和接收消息,所以当应用程序连接到RabbitMQ时,就必须做一个决定:我是发送还是接收哪? 现实: 生产者(Prod

mutex&condition variable 黄金搭档之 多消费者多生产者

Condition Variable都会搭配一个Mutex来用.我们知道Mutex的普通意义上是维持一个互斥变量,从而保证一个或一组操作的原子性.同样,简单的说Mutex加在Condition Variable上也是为了保证它的原子性了.Condition Variable,有条件的唤醒机制.最经典不过的就是生产--消息者模型了.但有一个细节,消费者需要有"产品"才能继续它的消费行为,因此当消费者发现"产品"被消费完了?它该怎么办?没错,普通情况下它就会进入等待挂起

Java程序设计之消费者和生产者

新建一个Break类,表示食物数量. public class Break { public static final int MAX = 10; //最多一次性煮十个面包 Stack<Integer> stack = new Stack<Integer>(); public static int food_count = 0; //统计食物的个数 //做面包 public synchronized void make_food(int number){ stack.push(nu

微观经济学(七):市场和福利 - 消费者、生产者与市场效率

前言 配置资源的一种方式是交给市场控制,但是这种配置方式是否合意呢?在本章中,我们要讨论福利经济学(welfare economics)这个主题,即研究资源配置如何影响经济福利的一门学问.这种分析将得出一个影响深远的的结论:市场上的供求均衡可以使买者和卖者得到的总利益最大化. 消费者剩余 支付意愿 假设你有一张崭新的猫王首张专辑想要卖出,卖出的一种方法是举行一场拍卖会.四个猫王迷出现在你的拍卖会上,他们每个人都想拥有这张专辑,但每个人愿意为此支付的价格都有限.下面表格列出了这四个人可能的买者中每

Spring整合kafka消费者和生产者&amp;redis的步骤

==================================================================================一.整合kafka(生产者)步骤1.导入依赖(pom.xml)2.编写配置文件,修改配置文件的ip和端口号,修改主题(producer.xml)3.如果再ssm项目中可以让spring.xml来加载这个配置文件 <import resource="classpath:XXX.xml" /> 如果是再测试类中如何加

进程间通信---IPC对象 之 消息队列

IPC对象,既我们所说的进程间通信,下面就来总结一下都有哪些方式以及怎么使用. 一 消息队列 1 消息队列的创建: int msgget(key_t key, int msgflg); 功能:获取指定的key值的消息队列ID 参数: @key <1>IPC_PRIVATE:每次都会创建一个新的消息队列 [用在亲缘关系的进程间痛惜] <2>ftok函数获的key [用在非亲缘关系进程间通信] key_t ftok(const char *pathname, int proj_id);

互斥锁,IPC队列

进程同步(锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,part1:共享同一打印终端,发现会有多行内容打印到一行的现象(多个进程共享并抢占同一个打印终端,乱了) #多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误) from multiprocessing import Process import time class Logger(Process): def __init__(self):