python—多进程的消息队列

消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现

一、使用multiprocessing里面的Queue来实现消息队列

q = Queue

q.put(data)

data = q.get()

例子:

from multiprocessing import Queue, Process
def write(q):
    for i in ["a","b","c","d"]:
        q.put(i)
        print("put {0} to queue".format(i))
def read(q):
    while 1:
        result = q.get()
        print("get {0} from queue".format(result))
def main():
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()
if __name__ == "__main__":
    main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue

二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplex为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。

例子:

from multiprocessing import Process,Pipe
import time
def proc1(pipe):
    for i in xrange(1,10):
        pipe.send(i)
        time.sleep(3)
        print("send {0} to pipe".format(i))
def proc2(pipe):
    n = 9
    while n>0:
        result = pipe.recv()
        time.sleep(3)
        print("recv {0} from pipe".format(result))
        n -= 1
if __name__ == "__main__":
    pipe = Pipe(duplex=False)
    print(type(pipe))
    p1 = Process(target=proc1,args=(pipe[1],))
    p2 = Process(target=proc2,args=(pipe[0],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    pipe[0].close()
    pipe[1].close()

运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe

三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。只要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put

以两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作

时间: 2024-10-17 13:50:49

python—多进程的消息队列的相关文章

day41——多进程的消息队列、消息队列pipe

多进程的消息队列 消息队列指的是消息在传输过程中保存消息的容器 消息队列最经典的用法是消费者和生产者之间通过消息管道来传递消息.消费者和和生产者是不同的进程,生产者往管道中写消息,消费者从管道中读消息 multiprocessing模块提供了Queue类 和 Pipe函数 实现消息队列 1. Queue 用法: In [1]: import multiprocessing In [2]: help(multiprocessing.Queue) Help on function Queue in

41. Python Queue 多进程的消息队列 PIPE

消息队列: 消息队列是在消息传输过程中保存消息的容器. 消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程.生产者往管道中写消息,消费者从管道中读消息. 相当于水管,有一个入口和出口,水从入口流入出口流出,这就是一个消息队列 线程或进程往队列里面添加数据,出口从队列里面读数据 左侧多线程往入口处添加完数据,任务就结束了:右侧只要依次从水管里取数据就行了. 异步完成的任务 比如京东下单,下单后付完钱,相当于把消息堆在了水管里,后台会有线程去接收这个单的消息

自动化运维Python系列之消息队列RabbitMQ

RabbitMQ RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco .Redhat.iMatix 等联合制定了 AMQP 的

Python 番外 消息队列设计精要

消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流的消息中间件,如老牌的ActiveMQ.RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify.MetaQ.RocketMQ等.本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面.过程中我们会参考这些成熟消息队列的很多重要思想.本文首先会阐述什么时候你需要一个消

[PHP] 多进程通信-消息队列使用

向消息队列发送数据和获取数据的测试 <?php $key=ftok(__FILE__,'a'); //获取消息队列 $queue=msg_get_queue($key,0666); //发送消息 //msg_send($queue, 1, "Hello, 1"); //接收消息,如果接收不到会阻塞 msg_receive($queue, 1, $message_type, 1024, $message1); //移除消息 //msg_remove_queue($queue); /

python 操作 redis + 消息队列使用例子

操作 redis import redis redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8) redis= redis.Redis(connection_pool=redisPool) redis.set('key','values') redis.get('com') redis.append('keys','values') redis.delete('keys') print(redis.get

python多进程中的队列数据共享问题

0x00 起 今天在写一个小东西的时候,需要控制并发量,但又不能直接调用python multiprocessing(问题会在文后提到).于是尝试用Queue来实现. 最一开始的思路是这样的: from multiprocessing import Process from Queue import Queue q = Queue(maxsize = 10) # 通过web应用往队列中添加数据 def put(num): q.put(num) def read(): while True: pr

Python中queue消息队列模块

from queue import Queue from queue import PriorityQueue print("Queue类实现了一个基本的先进先出(FIFO)容器,使用put()将元素添加到序列尾端,get()从队列尾部移除元素.\n") q = Queue() for i in range(3): q.put(i) while not q.empty(): print(q.get()) print("与标准FIFO实现Queue不同的是,LifoQueue使

进程间通信之消息队列通信

概念 消息队列 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值 消息队列也有管道一样的不足,就是每条消息的最大长度是有上限的(MSGMAX),每个消息队列的总字节数(内核缓冲上限)是有上限的(MSGMNB),系统上消息队列的总数(消息条目数)也有一个上限(MSGMNI) 对比: 管道 消息 流管道 有边界 先进先出 可以后进入.先出来 消息大小三大限制 cat /proc/sys/kernel/msgmax最