day41——多进程的消息队列、消息队列pipe

多进程的消息队列

消息队列指的是消息在传输过程中保存消息的容器

消息队列最经典的用法是消费者和生产者之间通过消息管道来传递消息。消费者和和生产者是不同的进程,生产者往管道中写消息,消费者从管道中读消息

multiprocessing模块提供了Queue类 和 Pipe函数 实现消息队列

1. Queue

用法:

In [1]: import multiprocessing

In [2]: help(multiprocessing.Queue)
Help on function Queue in module multiprocessing:

Queue(maxsize=0)
    Returns a queue object

In [3]: q = multiprocessing.Queue()         //实例化一个对象,对象的方法的用法和Queue模块中对象的方法的用法一毛一样

In [4]: q.
q.cancel_join_thread  q.empty               q.get                 q.join_thread         q.put_nowait
q.close               q.full                q.get_nowait          q.put                 q.qsize

例子:

 1 [[email protected] thread_process]# cat queue4.py
 2 #!/usr/bin/env python
 3
 4 from multiprocessing import Process, Queue
 5 def producer(q):
 6     for i in xrange(5):
 7         q.put(i)
 8         print ‘put {0} into queue‘.format(i)
 9
10 def consumer(q):
11     while 1:
12         result = q.get()
13         print ‘get {0} from queue‘.format(result)
14         if q.empty():
15             break
16
17
18 if __name__ == ‘__main__‘:
19     q = Queue()
20     p = Process(target=producer, args=(q,))
21     c = Process(target=consumer, args=(q,))
22     p.start()
23     p.join()
24     c.start()
25
26
27 [[email protected] thread_process]# python queue4.py
28 put 0 into queue
29 put 1 into queue
30 put 2 into queue
31 put 3 into queue
32 put 4 into queue
33 get 0 from queue
34 get 1 from queue
35 get 2 from queue
36 get 3 from queue
37 get 4 from queue

2. Pipe

Pipe方法返回一个二元元组(conn1, conn2),两个元素分别是两个连接对象,代表管道的两端,Pipe(duplex=True) 函数有一个默认参数duplex,默认等于True,表示这个管道是全双工模式,也就是说conn1和conn2均可收发;如果duplex=False,那么conn2只负责发消息到消息队列,conn1只负责从消息队列中读取消息

连接对象的常用方法有三个:

  • send()   ---> 发送消息到管道
  • recv()   ---> 从管道中读取消息
  • close()   --->关闭管道

duplex=False 例子:

 1 [[email protected] thread_process]# cat pipe.py
 2 #!/usr/bin/env python
 3
 4 import time
 5 from multiprocessing import Pipe, Process
 6
 7 def producer(p):
 8     for i in xrange(5):
 9         p.send(i)
10         print ‘send {0} to pipe‘.format(i)
11         time.sleep(1)
12
13 def consumer(p):
14     n = 5
15     while n>0:
16         result = p.recv()
17         print ‘recv {0} from pipe‘.format(result)
18         n -= 1
19
20 if __name__ == ‘__main__‘:
21     p = Pipe(duplex=False)
22     print p
23     p1 = Process(target=producer, args=(p[1],))
24     p2 = Process(target=consumer, args=(p[0],))
25     p1.start()
26     p2.start()
27     p1.join()
28     p2.join()
29     p[0].close()
30     p[1].close()
31
32
33 [[email protected] thread_process]# python pipe.py
34 (<read-only Connection, handle 3>, <write-only Connection, handle 4>)
35 send 0 to pipe
36 recv 0 from pipe
37 send 1 to pipe
38 recv 1 from pipe
39 send 2 to pipe
40 recv 2 from pipe
41 send 3 to pipe
42 recv 3 from pipe
43 send 4 to pipe
44 recv 4 from pipe

duplex=True例子:

 1 [[email protected] thread_process]# cat pipe1.py
 2 #!/usr/bin/env python
 3
 4 import time
 5 from multiprocessing import Pipe, Process
 6
 7 def producer(p):
 8     for i in xrange(5):
 9         p.se
10         print ‘send {0} to pipe‘.format(i)
11         time.sleep(1)
12
13 if __name__ == ‘__main__‘:
14     p = Pipe(duplex=True)
15     print p
16     p1 = Process(target=producer, args=(p[1],))
17     p2 = Process(target=producer, args=(p[0],))
18     p1.start()
19     p2.start()
20     p[0].close()
21     p[1].close()
22
23
24 [[email protected] thread_process]# python pipe1.py
25 (<read-write Connection, handle 5>, <read-write Connection, handle 6>)
26 send 0 to pipe
27 send 0 to pipe
28 send 1 to pipe
29 send 1 to pipe
30 send 2 to pipe
31 send 2 to pipe
32 send 3 to pipe
33 send 3 to pipe
34 send 4 to pipe
35 send 4 to pipe
时间: 2024-09-30 19:06:05

day41——多进程的消息队列、消息队列pipe的相关文章

【转】windows消息和消息队列详解

转载出处:http://blog.csdn.net/bichenggui/article/details/4677494  windows消息和消息队列 与基于MS - DOS的应用程序不同,Windows的应用程序是事件(消息)驱动的.它们不会显式地调用函数(如C运行时库调用)来获取输入,而是等待windows向它们传递输入. windows系统把应用程序的输入事件传递给各个窗口,每个窗口有一个函数,称为窗口消息处理函数.窗口消息处理函数处理各种用户输入,处理完成后再将控制权交还给系统.窗口消

Flume 读取JMS 消息队列消息,并将消息写入HDFS

利用Apache Flume 读取JMS 消息队列消息.并将消息写入HDFS,flume agent配置例如以下: flume-agent.conf #name the  components on this agent agentHdfs.sources  = jms_source agentHdfs.sinks =  hdfs_sink agentHdfs.channels  = mem_channel #  Describe/configure the source agentHdfs.s

rabbitmq之消息重入队列

说起消息重入队列还得从队列注册消费者说起,客户端在向队列注册消费者之后,创建的channel也会被主队列进程monitor,当channel挂掉后,主队列进程(rabbit_amqqueue_process)收到'DOWN'通知,将未ack的消息重入队列,并根据消息的deliver tag,也就是消费入队列的顺序,将消息重入队列中 主要代码如下: 1.注册消费者 handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag =

关于windows操作系统之消息和消息队列

关于消息和消息队列 不像基于MS-DOS的应用程序,基于Windows的程序是事件驱动的.他们不做任何显示调用来获取输入.而是通过等待系统传递给他们. 系统为应用程序传递所有输入到程序中的不同窗口.每个窗口都有一个称为窗口过程的函数,用于处理所有到该窗口的输入.窗口处理过程处理输入,并将控制返回给系统. 如果一个顶层窗口停止响应消息超过两秒,系统将会认为该窗口为非响应状态.在这种情况下,系统将隐藏该窗口并用拥有同样Z顺序,位置,尺寸和可视化属性的ghost窗口替代该窗口.这种情况下,允许用户移动

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产

ActiveMQ队列消息过期时间设置和自动清除解决方案

版本 apache-activemq-5.15.3 1.消息过期设置 参数详情 1)message过期则客户端不能接收 2)ttlCeiling:表示过期时间上限(程序写的过期时间不能超过此时间,超过则以此时间为准) 3)zeroExpirationOverride:表示过期时间(给未分配过期时间的消息分配过期时间) 配置示例 <broker> ... <plugins> <!-- 86,400,000ms = 1 day --> <timeStampingBro

使用rabbitmq手动确认消息的,定时获取队列消息实现

描述问题 最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互. 相关业务 本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送 相关分析 网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理.性能和效率等相关基础业务的工作量,望而却步...... 还好

# 进程/线程/协程 # IO:同步/异步/阻塞/非阻塞 # greenlet gevent # 事件驱动与异步IO # Select\Poll\Epoll异步IO 以及selectors模块 # Python队列/RabbitMQ队列

1 # 进程/线程/协程 2 # IO:同步/异步/阻塞/非阻塞 3 # greenlet gevent 4 # 事件驱动与异步IO 5 # Select\Poll\Epoll异步IO 以及selectors模块 6 # Python队列/RabbitMQ队列 7 8 ############################################################################################## 9 1.什么是进程?进程和程序之间有什么

Atitit.提升软件稳定性---基于数据库实现的持久化 循环队列 环形队列

Atitit.提升软件稳定性---基于数据库实现的持久化  循环队列 环形队列 1. 前言::选型(马) 1 2. 实现java.util.queue接口 1 3. 当前指针的2个实现方式 1 1.1. 用一个游标last 来指示 (指针表字段last ),麻烦的,不推荐 1 1.2. (简单,推荐)使用循环次数来指示,每循环加1   (字段cirTimes),order by cirtimes 1 4. 表格设计id, cirTimes,createtime,handlerID,recID,d