41. Python Queue 多进程的消息队列 PIPE

消息队列:

消息队列是在消息传输过程中保存消息的容器。

消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

相当于水管,有一个入口和出口,水从入口流入出口流出,这就是一个消息队列

线程或进程往队列里面添加数据,出口从队列里面读数据

左侧多线程往入口处添加完数据,任务就结束了;右侧只要依次从水管里取数据就行了。

异步完成的任务

比如京东下单,下单后付完钱,相当于把消息堆在了水管里,后台会有线程去接收这个单的消息,然后去库房,发货,走物流,直到接收货物并签收完,点击完成,整个流程才走完。

客户交完钱后,丢了个消息在这个队列中,会给客户返回一个结果,告知你已经买了这个商品;而后面接收订单消息,发货,物流都是后面的"进程"或"线程"干的事情。

所以,一般在异步处理问题时候,都会用到消息队列处理的这种思想。

使用multiprocessing里面的Queue来实现消息队列。

语法:

from mutliprocessing import Queue
q = Queue
q.put(data)
data = q.get(data)

举例:

from multiprocessing import Queue, Process

def write(q):
    for i in ['a','b','c','d']:
        q.put(i)
        print ('put {0} to queue'.format(i))

def read(q):
    while 1:
        result = q.get()
        print ("get {0} from queue".format(result))

def main():
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()  #停止
    # 相当于join,等pr完成以后,当whlie没有任何执行后,结束。

if __name__ == '__main__':
    main()

返回结果:

put a to queue
get a from queue
put b to queue
get b from queue
put c to queue
get c from queue
put d to queue
get d from queue

PIPE:

多进程里面有个pipe的方法来实现消息队列:

1. Pipe 方法返回(conn1, conn2)代表一个管道的两端。PIPE方法有个deplex参数,如果deplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2负责发送消息。

2.send 和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。

import time
from multiprocessing import Pipe, Process

def proc1(pipe):
    for i in xrange(1, 10):
        pipe.send(i)
        print ("send {0} to pipe".format(i))
        time.sleep(1)

def proc2(pipe):
    n = 9
    while n > 0:
        result = pipe.recv()
        print ("recv {0} from pipe".format(result))
        n -= 1

def main():
    pipe = Pipe(duplex=False)
    print (type(pipe))
    p1 = Process(target=proc1, args=(pipe[1],))
    p2 = Process(target=proc2, args=(pipe[0],)) #接收写0
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    pipe[0].close()
    pipe[1].close()

if __name__ == '__main__':
    main()

返回结果(逐行打印):

<type 'tuple'>
send 1 to pipe
recv 1 from pipe
recv 2 from pipe
send 2 to pipe
send 3 to pipe
recv 3 from pipe
recv 4 from pipe
send 4 to pipe
send 5 to pipe
recv 5 from pipe
recv 6 from pipe
send 6 to pipe
recv 7 from pipe
send 7 to pipe
recv 8 from pipe
send 8 to pipe
send 9 to pipe
recv 9 from pipe
时间: 2024-10-13 18:35:40

41. Python Queue 多进程的消息队列 PIPE的相关文章

day41——多进程的消息队列、消息队列pipe

多进程的消息队列 消息队列指的是消息在传输过程中保存消息的容器 消息队列最经典的用法是消费者和生产者之间通过消息管道来传递消息.消费者和和生产者是不同的进程,生产者往管道中写消息,消费者从管道中读消息 multiprocessing模块提供了Queue类 和 Pipe函数 实现消息队列 1. Queue 用法: In [1]: import multiprocessing In [2]: help(multiprocessing.Queue) Help on function Queue in

python—多进程的消息队列

消息队列 消息队列是在消息的传输过程中保存消息的容器 消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程.生产者往管道写消息,消费者从管道中读消息 操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现 一.使用multiprocessing里面的Queue来实现消息队列 q = Queue q.put(data) data = q.get() 例子: from multiprocessing

基于System V Message queue的PHP消息队列封装

原创文章,转载请注明出处:http://www.huyanping.cn/?p=235 作者:Jenner System V Message queue 是一种进程通信(IPC)的方式,方便实现生产者-消费者模型,单个或多个生产者向队列中写入消息,多个生产者再从队列中获取消息进行处理. 项目地址:https://github.com/huyanping/Zebra-PHP-Framework 该Wrapper支持: 进程通信 设置最大队列容量(字节单位) 获取当前队列数量 修改队列部分属性 注意

Python并发编程-RabbitMq消息队列

消息中间件 --->就是消息队列 异步方式:不需要立马得到结果,需要排队 同步方式:需要实时获得数据,坚决不能排队 subprocess 的Q也提供不同进程之间的沟通 应用场景: 买票,抢购 堡垒机批量发送文件 Centos6.x系统编译安装RabbitMQ 一.系统环境 [[email protected] ~]# cat /etc/redhat-release CentOS release 6.6 (Final) [[email protected] ~]# uname -r 2.6.32-

[PHP] 多进程通信-消息队列使用

向消息队列发送数据和获取数据的测试 <?php $key=ftok(__FILE__,'a'); //获取消息队列 $queue=msg_get_queue($key,0666); //发送消息 //msg_send($queue, 1, "Hello, 1"); //接收消息,如果接收不到会阻塞 msg_receive($queue, 1, $message_type, 1024, $message1); //移除消息 //msg_remove_queue($queue); /

基于Python语言使用RabbitMQ消息队列(一)

介绍 RabbitMQ 是一个消息中间人(broker): 它接收并且发送消息. 你可以把它想象成一个邮局: 当你把想要寄出的信放到邮筒里时, 你可以确定邮递员会把信件送到收信人那里. 在这个比喻中, RabbitMQ 就是一个邮筒, 同时也是邮局和邮递员 . 和邮局的主要不同点在于RabbitMQ不处理纸质信件, 而是 接收(accepts), 存储(stores) 和转发(forwards)二进制数据块 —— 消息(messages). 在RabbitMQ中有一些自己的行业术语要了解 . 生

基于Python语言使用RabbitMQ消息队列(二)

工作队列 在第一节我们写了程序来向命名队列发送和接收消息 .在本节我们会创建一个工作队列(Work Queue)用来在多个工人(worker)中分发时间消耗型任务(time-consuming tasks). 工作队列(又叫做: Task Queues)背后的主体思想是 避免立刻去执行耗时任务并且等待它们完成. 相反我们可以安排这样的任务稍后执行. 我们可以把任务封装成一个消息并发送到队列中. 一个在后台运行的工人进程会接收任务并最终执行工作.当你使很多工人(workers)程序运行时,多个任务

python 学习总结3 消息队列RabbitMQ

我们以前学过的队列,在线程中针对同一程序下的多个线程直接 可以实现消息的发送接收,对于进程来说只能在父进程与子进程中或者 父进程 下的子进程之间 进行,都无法实现连个进程的交互 RabbitMQ 实现了这一功能 需要先下载RabbitMQ 之后还需要下载erlang语言,因为RabbitMQ就是由erlang编辑而成的! 生产者: import pika #我们先生成一个链接的实例 connection=pika.BlockingConnection(pika.ConnectionParamet

Python消息队列

消息中间件 --->就是消息队列 异步方式:不需要立马得到结果,需要排队 同步方式:需要实时获得数据,坚决不能排队 例子: #多进程模块multiprocessing from multiprocessing import Process from multiprocessing import Queue def write(q): for i in ["a", "b", "c", "d"]: q.put(i) prin