使用Python实现生产者消费者问题

之前用C++写过一篇生产者消费者的实现。

生产者和消费者主要是处理互斥和同步的问题:

队列作为缓冲区,需要互斥操作

队列中没有产品,消费者需要等待,直到生产者放入产品并通知它。队列慢的情况类似。

这里我使用list模拟Python标准库的Queue,这里我设置一个大小限制为5:

SyncQueue.py

from threading import Lock
from threading import Condition
class Queue():
    def __init__(self):
        self.mutex = Lock()
        self.full = Condition(self.mutex)
        self.empty = Condition(self.mutex)
        self.data = []

    def push(self, element):
        self.mutex.acquire()
        while len(self.data) >= 5:
            self.empty.wait()

        self.data.append(element)
        self.full.notify()
        self.mutex.release()

    def pop(self):
        self.mutex.acquire()
        while len(self.data) == 0:
            self.full.wait()
        data = self.data[0]
        self.data.pop(0)
        self.empty.notify()
        self.mutex.release()

        return data

if __name__ == ‘__main__‘:
    q = Queue()
    q.push(10)
    q.push(2)
    q.push(13)

    print q.pop()
    print q.pop()
    print q.pop()

这是最核心的代码,注意里面判断条件要使用while循环。

接下来是生产者进程,producer.py

from threading import Thread
from random import randrange
from time import sleep
from SyncQueue import Queue

class ProducerThread(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            data = randrange(0, 100)
            self.queue.push(data)
            print ‘push %d‘ % (data)
            sleep(1)

if __name__ == ‘__main__‘:
    q = Queue()
    t = ProducerThread(q)
    t.start()
    t.join()

消费者,Condumer.py

from threading import Thread
from time import sleep
from SyncQueue import Queue

class ConsumerThread(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            data = self.queue.pop()
            print ‘pop %d‘ % (data)
            sleep(1)

if __name__ == ‘__main__‘:
    q = Queue()
    t = ConsumerThread(q)
    t.start()
    t.join()

最后我们写一个车间类,可以指定线程数量:

from SyncQueue import Queue
from Producer import ProducerThread
from Consumer import ConsumerThread

class WorkShop():
    def __init__(self, producerNums, consumerNums):
        self.producers = []
        self.consumers = []
        self.queue = Queue()
        self.producerNums = producerNums
        self.consumerNums = consumerNums
    def start(self):
        for i in range(self.producerNums):
            self.producers.append(ProducerThread(self.queue))
        for i in range(self.consumerNums):
            self.consumers.append(ConsumerThread(self.queue))
        for i in range(len(self.producers)):
            self.producers[i].start()
        for i in range(len(self.consumers)):
            self.consumers[i].start()
        for i in range(len(self.producers)):
            self.producers[i].join()
        for i in range(len(self.consumers)):
            self.consumers[i].join()

if __name__ == ‘__main__‘:
    w = WorkShop(3, 4)
    w.start()

最后写一个main模块:

from WorkShop import WorkShop

if __name__ == ‘__main__‘:
    w = WorkShop(2, 3)
    w.start()
时间: 2024-08-29 06:04:52

使用Python实现生产者消费者问题的相关文章

python中生产者消费者

1.用函数来实现生产者消费者模型 (1).源代码如下: #!/usr/bin/python #_*_coding:utf-8_*_ import threading import time import Queue import random def Producer(name, que):     while True:         if que.qsize() < 3:             que.put('baozi')             print '%s : Made a

python 生产者消费者线程模型

python 多线程生产者消费者模型: 一个生产者多个消费者 The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module imple

python 多线程笔记(5)-- 生产者/消费者模式

我们已经知道,对公共资源进行互斥访问,可以使用Lock上锁,或者使用RLock去重入锁. 但是这些都只是方便于处理简单的同步现象,我们甚至还不能很合理的去解决使用Lock锁带来的死锁问题. 要解决更复杂的同步问题,就必须考虑别的办法了. threading提供的Condition对象提供了对复杂线程同步问题的支持. Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法. 使用Condition的主要方式为: 线程首先a

Python学习笔记——进阶篇【第九周】———线程、进程、协程篇(队列Queue和生产者消费者模型)

Python之路,进程.线程.协程篇 本节内容 进程.与线程区别 cpu运行原理 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 参考链接http://www.cnblogs.com/alex3714/articles/5230609.html

生产者消费者 协同程序 python

因为相对于子例程协程可以有多个入口和出口点,可以用协程来实现任何的子例程.正如Knuth所说:"子例程是协程的特例."因为子例程只返回一次,要返回多个值就要通过集合的形式.这在有些语言,如Forth里很方便:而其他语言,如C,只允许单一的返回值,所以就需要引用一个集合.相反地,因为协程可以返回多次,返回多个值只需要在后继的协程调用中返回附加的值即可.在后继调用中返回附加值的协程常被称为产生器.有些情况下,使用协程的实现策略显得很自然,但是此环境下却不能使用协程.典型的解决方法是创建一个

进击的Python【第九章】:paramiko模块、线程与进程、各种线程锁、queue队列、生产者消费者模型

一.paramiko模块 他是什么东西? paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接. 先来个实例: 1 import paramiko 2 # 创建SSH对象 3 ssh = paramiko.SSHClient() 4 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ss

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

python多线程编程-queue模块和生产者-消费者问题

摘录python核心编程 本例中演示生产者-消费者模型:商品或服务的生产者生产商品,然后将其放到类似队列的数据结构中.生产商品中的时间是不确定的,同样消费者消费商品的时间也是不确定的. 使用queue模块(python2.x版本中,叫Queue)来提供线程间通信的机制,从而让线程之间可以分享数据.具体而言,就是创建一个队列,让生产者(线程)在其中放入新的商品,而消费者(线程)消费这些商品. 下表是queue模块的部分属性: 属性 描述 queue模块的类 Queue(maxsize=0) 创建一

用Python多线程实现生产者消费者模式爬取斗图网的表情图片

什么是生产者消费者模式 某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数.线程.进程等).产生数据的模块称为生产者,而处理数据的模块称为消费者.在生产者与消费者之间的缓冲区称之为仓库.生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式. 生产者消费者模式的优点 解耦假设生产者和消费者分别是两个线程.如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合).如果未来消费者的代码发生变化,可能会影响到生产者的代码.