Python Queue实现生产与消费

Python Queue模块详解

from:https://blog.linuxeye.com/334.html

Python中,队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外。

创建一个“队列”对象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。     class Queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。                         class Queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。    class Queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

范例:
实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块)
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数

#!/usr/bin/env python
#coding:utf8
import random,threading,time
from Queue import Queue
#Producer thread
class Producer(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
    def run(self):
        for i in range(10):
            randomnum=random.randint(1,99)
            print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
            self.data.put(randomnum)
            time.sleep(1)
        print "%s: %s finished!" %(time.ctime(), self.getName())

#Consumer thread
class Consumer_even(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
    def run(self):
        while 1:
            try:
                val_even = self.data.get(1,5)  #get(self, block=True, timeout=None)
                if val_even%2==0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
                    time.sleep(2)
                else:
                    self.data.put(val_even)
                    time.sleep(2)
            except:
                print "%s: %s finished!" %(time.ctime(),self.getName())
                break
class Consumer_odd(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        while 1:
            try:
                val_odd = self.data.get(1,5)
                if val_odd%2!=0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
                    time.sleep(2)
                else:
                    self.data.put(val_odd)
                    time.sleep(2)
            except:
                print "%s: %s finished!" % (time.ctime(), self.getName())
                break
#Main thread
def main():
    queue = Queue()
    producer = Producer(‘Pro.‘, queue)
    consumer_even = Consumer_even(‘Con_even.‘, queue)
    consumer_odd = Consumer_odd(‘Con_odd.‘,queue)
    producer.start()
    consumer_even.start()
    consumer_odd.start()
    producer.join()
    consumer_even.join()
    consumer_odd.join()
    print ‘All threads terminate!‘

if __name__ == ‘__main__‘:
    main()
时间: 2024-10-09 20:16:09

Python Queue实现生产与消费的相关文章

Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控

基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控   By: 授客 QQ:1033553122   1.测试环境 python 3.4 zookeeper-3.4.13.tar.gz 下载地址1: http://zookeeper.apache.org/releases.html#download https://www.apache.org/dyn/closer.cgi/zookeeper/ https://mirrors.tuna.tsinghua.edu

Python Queue模块

创建一个"队列"对象 import Queuemyqueue = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中 myqueue.put(10) 调用队列对象的put()方法在队尾插入一个项目.put()有两个参数,第一个item为必需的,为插入项目的值:第二个block为可选参

python Queue模块使用

Python中,队列是线程间最常用的交换数据的形式.Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外. 创建一个"队列"对象import Queueq = Queue.Queue(maxsize = 10)Queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中q.put(10)    put(item[

JAVA代码之RocketMQ生产和消费数据

一.启动RocketMQ [[email protected] ~]# cat /etc/hosts # Do not remove the following line, or various programs # that require network functionality will fail. 127.0.0.1               localhost.localdomain localhost ::1             localhost6.localdomain6

Kafka 使用Java实现数据的生产和消费demo

前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能. 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输. 支持Kafka Serv

Kafka 通过python简单的生产消费实现

使用CentOS6.5.python3.6.kafkaScala 2.10  - kafka_2.10-0.8.2.2.tgz (asc, md5) 一.下载kafka 下载地址 https://kafka.apache.org/downloads 里面包含zookeeper 二.安装Kafka 1.安装zookeeper mkdir /root/kafka/ tar -vzxf kafka_2.10-0.8.2.2 cd /root/kafka/kafka_2.10-0.8.2.2 cat 

python并行任务之生产消费模式

一. 生产者/消费者模式 概念:生产者产生一块数据,放到buffer中,与此同时,消费者在从buffer中取出并消耗这些数据 理解:像生活中厂家生产出产品,顾客购买消耗这些产品,buffer就是存放商品的仓库. 二. 生产者/消费者模式在python中的实现 相关模块:Queue模块 简单介绍:Python中,队列是线程间最常用的交换数据的形式之一.Queue模块是python中提供队列操作的模块. 原理:它创建一个"队列"对象(即用于存放数据的buffer), 然后不断产生数据并存入

使用C#的泛型队列Queue实现生产消费模式

本篇体验使用C#的泛型队列Queue<T>实现生产消费模式. 如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列. 现在,要写一个体现生产消费模式的泛型帮助类,比如叫ProducerConsumer<T>. 该类肯定会维护一个有关生产.物料的Queue<T>类型的字段,还存在一个有关消费.Action<T>类型的字段. 在ProducerConsumer类的构造函数中,为Action<T&

多线程,生产者消费者模型(生产馒头,消费馒头)

先建立一个容器 /** * 容器 * 共享资源 * @author Administrator * */ public class SynStack { int index = 0; //容器 SteamBread[] stb = new SteamBread[6]; /** * 往容器中放产品 */ public synchronized void push(SteamBread st){ while(index == stb.length){ try { this.wait(); } cat