管道实现生产者消费者模型

# 管道实现生产者消费者模型
    # # 应该特别注意管道端点的正确管理问题,如果是生产者或消费者中都没有使用管道的端点就应该将它关闭
    # 这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这个步骤
    # 程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道
    # 后才能生成EoFEroor异常,因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

# Pipe 存在数据不安全性
    # 存在管道的一端被多个消费者进程取同一个数据的问题,出现时会报错。队列是同一时间只能有一个进程取数据。
    # 解决方案,加上锁,,管道的操作加上锁后,其实就是队列了,管道比队列底层,管道走的是socket。队列是数据安全性的

# 队列相当于带锁的管道

import time
import random
from multiprocessing import Process, Pipe, Lock

def consumer(con, pro, name, lock):
    ‘‘‘
    # 消费者一方
    :param con:
    :param pro:
    :param name:
    :return:
    ‘‘‘

    pro.close() # 消费者不需要生产者一方的管道端点,关闭掉
    while True:
        try:
            lock.acquire()  # 管道操作前锁,防止多个进程争抢一个数据
            food = con.recv()   # 消费者用消费者的管道端点,从管道中获取数据,当管道的所有端点被关闭掉时(我认为是管道的一端被全部关闭时),会抛出异常错误EoFError
            lock.release()
            print(‘%s 吃了 %s ‘ % (name, food))
            time.sleep(random.randint(1, 3))
        except EOFError:
            print(‘管道抛出了异常EOFError,管道所有端点被关闭掉,说明生产者生产完了‘)
            con.close()
            lock.release()
            break

def producer(con, pro, name, food):
    ‘‘‘
    生产者一方
    :param con: # 消费者一方的管道端点
    :param pro: # 生产者一方的管道端点
    :param name:    # 谁生产
    :param food:    # 生产的东西
    :return:
    ‘‘‘
    con.close() # 生产者不需要消费者一方的管道端点,关闭掉
    for i in range(4):
        time.sleep(random.randint(1, 3))
        f = ‘%s 生产了 第%s个%s‘ % (name, i, food)
        print(f)
        pro.send(f) # 生产者操作生产管道端点向管道中传输数据
    pro.close() # 生产者生产完了后,将生产者管道端点关闭掉

if __name__ == ‘__main__‘:
    con, pro = Pipe()   # 创建一个管道,一端给消费者用,一端给生产者用
    lock = Lock()
    p = Process(target=producer, args=(con, pro, ‘why‘, ‘泔水‘))
    p.start()

    p2 = Process(target=consumer, args=(con, pro, ‘fqq‘, lock))
    p2.start()

    p3 = Process(target=consumer, args=(con, pro, ‘fqq2‘, lock))
    p3.start()

    con.close()
    pro.close()

原文地址:https://www.cnblogs.com/whylinux/p/9825165.html

时间: 2024-11-14 11:19:02

管道实现生产者消费者模型的相关文章

进程 >> 互斥锁、队列与管道、生产者消费者模型

目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重新实例化一把锁,我们的目的是想让所有的子进程使用同一把锁,所以需要把锁传递给子进程在使用 锁名.acquire():开锁->所有子进程开始抢位置 锁名.release():关锁->位置排好了,开始执锁起来执行. join与互斥锁的区别:join是把所有的子进程代码变为串行的,而互斥锁则可以规定那几

joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道

一.生产者消费者 主要是为解耦(借助队列来实现生产者消费者模型) import queue  # 不能进行多进程之间的数据传输 (1)from multiprocessing import Queue    借助Queue解决生产者消费者模型,队列是安全的. q = Queue(num) num :为队列的最大长度 q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待 q.get_now

生产者消费者模型,管道,进程之间共享内存,进程池

课程回顾: 并行:在同一时间点上多个任务同时执行 并发:在同一时间段上多个任务同时执行 进程的三大基本状态: 就绪状态:所有进程需要的资源都获取到了,除了CPU 执行状态:获取到了所有资源包括CPU,进程处于运行状态 阻塞状态:程序停滞不在运行,放弃CPU,进程此时处于内存里 什么叫进程? 正在运行的程序 有代码段,数据段,PCB(进程控制块) 进程是资源分配的基本单位. 进程之间能不能直接通信? 正常情况下,多进程之间是无法进行通信的.因为每个进程都有自己独立的空间 锁: 为了多进程通信时,保

python2.0_s12_day9之day8遗留知识(queue队列&生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ

python并发编程之多进程(二):互斥锁(同步锁)&进程其他属性&进程间通信(queue)&生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

生产者消费者模型 线程池

1.生产者消费者模型 主要是为解耦 借助队列来实现生产者消费这模型 栈:先进后出(First In Last Out 简称:FILO) 队列:先进先出(FIFO) import queue from multiprocessing import Queue 借助Queue解决生产者消费这模型队列是安全的 q=Queue(m) q = Queue(num) num : 队列的最大长度 q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put()# 阻塞,如果可以继

5 并发编程--队列&生产者消费者模型

1.队列的介绍 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: maxsize是队列中允许最大项数,省略则无大小限制. 但需要明确: 1.队列内存放的是消息而非大数据 2.队列占用的是内存空间,因而maxsize即便

Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型

一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例子 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

守护进程 进程:一个正在运行的程序. 主进程创建守护进程: 1.守护进程会在主进程代码执行结束后就终止, 2.守护进程内无法再开启子进程,否则抛出异常. 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止. 例子:from multiprocessing import Processimport time def task(): print('老了....') time.sleep(2) print('睡了一会..') if __name__ == '__main__': prin