python并行任务之生产消费模式

一. 生产者/消费者模式

概念:生产者产生一块数据,放到buffer中,与此同时,消费者在从buffer中取出并消耗这些数据

理解:像生活中厂家生产出产品,顾客购买消耗这些产品,buffer就是存放商品的仓库。

二. 生产者/消费者模式在python中的实现

相关模块:Queue模块

简单介绍:Python中,队列是线程间最常用的交换数据的形式之一。Queue模块是python中提供队列操作的模块。

原理:它创建一个"队列"对象(即用于存放数据的buffer), 然后不断产生数据并存入该"队列",同时也在不断

   地从该队列中取出数据。

具体函数

(1)创建一个队列对象

1 >>> import Queue
2 >>> q = Queue.Queue()

注:队列长度可为无限或有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。若maxsize小于1则表示队列长度无限,例:

(2)向队列中存入数据

方法: q.put(item, block=True, timeout=None)

>>> q.put(‘a‘)

注:put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。

  如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

(3)从队列中取出数据

方法: q.get(block=True, timeout=None)

>>> q.get()

注:get方法可选参数为block,默认为True。

  如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Queue.Queue中常用方法:

q.qsize()           返回队列的大小
q.empty()           如果队列为空,返回True,反之False
q.full()            如果队列已满,返回True,反之False。与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait()         相当q.get(False) 非阻塞
q.put(item,timeout)    写入队列,timeout等待时间
q.put_nowait(item)      相当q.put(item, False)
q.task_done()         完成一项工作之后, 函数向任务已经完成的队列发送一个信号
q.join()            表示等到队列为空,再执行别的操作

实例测试:

#!/usr/bin/env python
#coding=utf-8

import threading, time
import Queue    #导入消息队列模块
import random   #导入随机数模块,是为了模拟生产者与消费者速度不一致的情形

q = Queue.Queue()         #实例化一个队列对象,当有多个线程共享一个东西的时候就可以用它了

def Producer():           #生产者函数
    for i in range(20):
        q.put(i)          #将结果放入消息队列中
        print ‘[+] Product %s‘ %i
        time.sleep(random.randrange(3))    #生产者的生产速度,3s内
def Consumer():           #消费者函数
    count = 0
    while count < 20:
        data = q.get()    #取用消息队列中存放的结果
        print ‘[-] Consume %s‘ %data
        count += 1
        time.sleep(random.randrange(4))    #消费者的消费速度,4s内

producter = threading.Thread(target = Producer)
consumer = threading.Thread(target = Consumer)

producter.start()
consumer.start()

运行结果:

       

时间: 2024-08-09 10:44:43

python并行任务之生产消费模式的相关文章

生产消费模式

此模式是为防止并发的 因为只能有一个对目标进行写操作 ,所以由唯一的消费者进行目标的管理 生产消费模式,布布扣,bubuko.com

使用C#的泛型队列Queue实现生产消费模式

本篇体验使用C#的泛型队列Queue<T>实现生产消费模式. 如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列. 现在,要写一个体现生产消费模式的泛型帮助类,比如叫ProducerConsumer<T>. 该类肯定会维护一个有关生产.物料的Queue<T>类型的字段,还存在一个有关消费.Action<T>类型的字段. 在ProducerConsumer类的构造函数中,为Action<T&

异步简析之BlockingCollection实现生产消费模式

目前市面上有诸多的产品实现队列功能,比如Redis.MemCache等... 其实c#中也有一个基础的集合类专门用来实现生产/消费模式 (生产模式还是建议使用Redis等产品) 下面是官方的一些资料和介绍: BlockingCollection是一个线程安全集合类,可提供以下功能: 实现制造者-使用者模式. 通过多线程并发添加和获取项. 可选最大容量. 集合为空或已满时通过插入和移除操作进行阻塞. 插入和移除"尝试"操作不发生阻塞,或在指定时间段内发生阻塞. 封装实现 IProduce

Java的多线程实现生产/消费模式

Java的多线程实现生产/消费模式 在Java的多线程中,我们经常使用某个Java对象的wait(),notify()以及notifyAll() 方法实现多线程的通讯,今天就使用Java的多线程实现生产/消费模式,需求如下: 线程A ProductThread 继承Thread 实现生产数据 若线程共享的数据不为NULL,则生产线程进入等待状态 线程B CustomThread 继承Thread 实现消费数据(输出到控制台) 当线程共享数据为NULL的时候,进入等待状态 线程B 消费完数据之后,

Kafka 通过python简单的生产消费实现

使用CentOS6.5.python3.6.kafkaScala 2.10  - kafka_2.10-0.8.2.2.tgz (asc, md5) 一.下载kafka 下载地址 https://kafka.apache.org/downloads 里面包含zookeeper 二.安装Kafka 1.安装zookeeper mkdir /root/kafka/ tar -vzxf kafka_2.10-0.8.2.2 cd /root/kafka/kafka_2.10-0.8.2.2 cat 

gRPC Python 入门到生产环境

所有的代码在 https://github.com/xsren/learning_record/tree/master/grpc,欢迎star. 一.先了解几个概念 RPC RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. gRPC gRPC是一个高性能.通用的开源RPC框架,其由Google主要由开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议

python fabric模块 生产环境dubbo代码包发布管理完整实例

#!/usr/bin/env python import os import time import fabric from fabric.api import * from fabric.context_managers import * from fabric.colors import * from fabric.contrib.console import confirm env.hosts=['XXXX.XXXX.XXXX.XXXX'] env.user='user' env.pass

多线程生产消费模式简单实例

有一道这样的题目,用多线程方式实现生产者消费者模式,生产者随便产生一个0-1000之间的数,消费者打印出生产的数据.当随机产生的数是0时,生产线程和消费线程都退出. 思路:用一个队列Queue存储产生的数据,队列作为共享数据在生产者和消费者共享. 生产者: /***数据生产者 */ import java.util.Queue;import java.util.Random; /** * 名称:类的中文名称 <br> * 功能:对类的功能进行说明 <br/> * <br/&g

Python Queue实现生产与消费

Python Queue模块详解 from:https://blog.linuxeye.com/334.html Python中,队列是线程间最常用的交换数据的形式.Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外. 创建一个“队列”对象import Queueq = Queue.Queue(maxsize = 10)Queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果