python多进程(三)

消息队列

消息队列”是在消息的传输过程中保存消息的容器。

消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

相当于水管,有一个入口和出口,水从入口流入,从出口流出,这就是一个消息队列。左侧线程或者进程往队列里面添加数据,它的任务就结束了,右侧线程或者进程只要依次从出口处读取数据就可以了。

消息队列的思想

比如在京东下单,并付完钱,相当于把消息堆在了水管里面,会返回一个结果给客户,告知客户已经购买此商品,后台会有线程去接收并处理这个订单消息,然后去库房发货、走物流,知道最后接收货物并签收,这个流程就算结束了。所以,在异步处理问题的时候,都会用到消息队列的这种思想。操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。

使用multiprocessing里面的Queue来实现消息队列。

语法格式:

from multiprocessing import Queue
q = Queue
q.put(data)
data = q.get(data)

例子:

from multiprocessing import Queue, Process

# 写数据的进程
def write(q):
    for i in [‘a‘,‘b‘,‘c‘,‘d‘]:
        q.put(i)  # 把消息放入队列
        print (‘put {0} to queue‘.format(i))

# 读取数据的进程
def read(q):
    while 1:
        result = q.get()  # 从队列中读取消息
        print ("get {0} from queue".format(result))

def main():
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write,args=(q,))  # 使用多进程,传入的参数是消息队列
    pr = Process(target=read,args=(q,))
    pw.start()  # 启动子进程,写入数据
    pr.start()  # 启动子进程,读取数据
    pw.join()   # 等待pw进程结束
    pr.terminate()  #停止
    # 相当于join,等pr完成以后,while是一个死循环,这里强制结束,因为读取数据的进程应该是一直监听是否有数据产生,有就会去读取。

if __name__ == ‘__main__‘:
    main()

结果:
put a to queue
get a from queue
put b to queue
get b from queue
put c to queue
get c from queue
put d to queue
get d from queue

使用multiprocessing里面的PIPE来实现消息队列。

1、Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2只负责发送消息。

2、send和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。

例子:

import time
from multiprocessing import Pipe, Process

# 发送消息的进程
def proc1(pipe):
    for i in xrange(1, 10):
        pipe.send(i)
        print ("send {0} to pipe".format(i))
        time.sleep(1)

# 接收消息的进程
def proc2(pipe):
    n = 9
    while n > 0:
        result = pipe.recv()
        print ("recv {0} from pipe".format(result))
        n -= 1

def main():
    pipe = Pipe(duplex=False)  # 设置半双工模式,p1只负责发送消息,p2只负责接收消息,pipe是一个tuple类型
    p1 = Process(target=proc1, args=(pipe[1],))
    p2 = Process(target=proc2, args=(pipe[0],)) #接收写0
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    pipe[0].close()
    pipe[1].close()

if __name__ == ‘__main__‘:
    main()

结果:
send 1 to pipe
recv 1 from pipe
recv 2 from pipe
send 2 to pipe
send 3 to pipe
recv 3 from pipe
recv 4 from pipe
send 4 to pipe
send 5 to pipe
recv 5 from pipe
recv 6 from pipe
send 6 to pipe
recv 7 from pipe
send 7 to pipe
recv 8 from pipe
send 8 to pipe
send 9 to pipe
recv 9 from pipe

Python提供了Queue模块来专门实现消息队列

Queue对象

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。

Queue.full():类似上边,判断消息队列是否满

Queue.put(item, block=True, timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item, False).

Queue.get(block=True, timeout=None):获取一个消息,其他同put。

以下两个函数用来判断消息对应的任务是否完成。

Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成。

Queue.join(): 实际上意味着等到队列为空,再执行别的操作

例子:

from multiprocessing import Queue
from threading import Thread
import time

"""
一个生产者和两个消费者,
采用多线程继承的方式,
一个消费偶数,一个消费奇数。
"""

class Proceducer(Thread):
    def __init__(self, queue):
        super(Proceducer, self).__init__()
        self.queue = queue

    def run(self):
        try:
            for i in xrange(1, 10):
                print ("put {0} to queue".format(i))
                self.queue.put(i)
        except Exception as e:
            print ("put data error")
            raise e

class Consumer_even(Thread):
    def __init__(self, queue):
        super(Consumer_even, self).__init__()
        self.queue = queue

    def run(self):
        try:
            while not self.queue.empty():   # 判断队列是否为空
                number = self.queue.get(block=True, timeout=3)  # 从队列中获取消息,block=True表示阻塞,设置超时未3s
                if number % 2 == 0:   # 如果获取的消息是偶数
                    print("get {0} from queue EVEN, thread name is {1}".format(number, self.getName()))
                else:
                    self.queue.put(number)  # 如果获取的消息不是偶数,就接着把它放回队列中
                time.sleep(1)
        except Exception as e:
            raise e

class Consumer_odd(Thread):
    def __init__(self, queue):
        super(Consumer_odd, self).__init__()
        self.queue = queue

    def run(self):
        try:
            while not self.queue.empty():
                number = self.queue.get(block=True, timeout=3)
                if number % 2 != 0:  # 如果获取的消息是奇数
                    print("get {0} from queue ODD, thread name is {1}".format(number, self.getName()))
                else:
                    self.queue.put(number)
                time.sleep(1)
        except Exception as e:
            raise e

def main():
    queue = Queue()
    p = Proceducer(queue=queue)
    # 开始产生消息
    print("开始产生消息")
    p.start()
    p.join()   # 等待生产消息的进程结束
    time.sleep(1) # 消息生产完成之后暂停1s
    c1 = Consumer_even(queue=queue)
    c2 = Consumer_odd(queue=queue)

    # 开始消费消息
    print("开始消费消息")
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    print ("消息消费完成")

if __name__ == ‘__main__‘:
    main()

结果:
开始产生消息
put 1 to queue
put 2 to queue
put 3 to queue
put 4 to queue
put 5 to queue
put 6 to queue
put 7 to queue
put 8 to queue
put 9 to queue
开始消费消息
get 1 from queue ODD, thread name is Thread-3
get 2 from queue EVEN, thread name is Thread-2
get 3 from queue ODD, thread name is Thread-3
get 4 from queue EVEN, thread name is Thread-2
get 5 from queue ODD, thread name is Thread-3
get 6 from queue EVEN, thread name is Thread-2
get 7 from queue ODD, thread name is Thread-3
get 8 from queue EVEN, thread name is Thread-2
get 9 from queue ODD, thread name is Thread-3
消息消费完成

Celery异步分布式

什么是celery

Celery是一个python开发的异步分布式任务调度模块。

几个概念

broker:

brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列) ,常见的 brokers 有 rabbitmq、redis、Zookeeper 等。

backend:

顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了 ,常见的 backend 有 redis、Memcached 甚至常用的数据都可以。

worker:

就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行。

task:

就是我们想在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由workers进行处理。

Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等。

这里我们用redis当做celery的broker和backend。

连接url的格式为

redis://:[email protected]:port/db_number
例如:
BROKER_URL = ‘redis://localhost:6379/0‘

安装celery

pip install celery
pip install redis
pip install redis-py-with-geo  # 没有安装这个会报错

File "/usr/lib/python2.7/site-packages/kombu/transport/redis.py", line 671, in _receive
    while c.connection.can_read(timeout=0):
TypeError: can_read() got an unexpected keyword argument ‘timeout‘

在服务器上安装redis并启动redis,我安装的redis指定端口为5000。

例子:

vi tasks.py
#/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery
broker="redis://110.106.106.220:5000/5"
backend="redis://110.106.106.220:5000/6"
app = Celery("tasks", broker=broker, backend=backend)

@app.task
def add(x, y):
return x+y

现在broker、backend、task都有了,接下来我们就运行worker进行工作,在tasks.py目录运行:

celery -A tasks worker -l info

启动后可以看到如下信息:

[[email protected] scripts]# celery -A celery_test worker -l info
/usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You‘re running the worker with superuser privileges: this is absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- [email protected] v4.1.1 (latentcall)
---- **** -----
--- * ***  * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2018-05-25 14:28:38
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         celery_test:0x25a6450
- ** ---------- .> transport:   redis://110.106.106.220:5000/5
- ** ---------- .> results:     redis://110.106.106.220:5000/6
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add

[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5
[2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching for neighbors
[2018-05-25 14:28:39,475: INFO/MainProcess] mingle: all alone
[2018-05-25 14:28:39,528: INFO/MainProcess] [email protected] ready.

意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态),最后一步,就是触发任务,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数。

vi trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():  # 是否处理
    time.sleep(1)
print ‘task done: {0}‘.format(result.get())  # 获取结果
print(result.task_id)

delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。

运行trigger.py之后可以看到如下信息:

[[email protected] scripts]# python trigger.py
task done: 8
celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2

celery的任务状态

在之前启动tasks.py的窗口可以看到如下信息:

[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5
[2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching for neighbors
[2018-05-25 14:28:39,475: INFO/MainProcess] mingle: all alone
[2018-05-25 14:28:39,528: INFO/MainProcess] [email protected] ready.
[2018-05-25 14:33:30,340: INFO/MainProcess] Received task: tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2]
[2018-05-25 14:33:30,373: INFO/ForkPoolWorker-1] Task tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2] succeeded in 0.0313169739966s: 8
[2018-05-25 14:33:47,082: INFO/MainProcess] Received task: tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a]
[2018-05-25 14:33:47,086: INFO/ForkPoolWorker-1] Task tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a] succeeded in 0.00259069999447s: 8

在redis中查看:

110.106.106.220:5000[5]> select 5
OK
110.106.106.220:5000[5]> keys *
1) "_kombu.binding.celeryev"
2) "_kombu.binding.celery.pidbox"
3) "_kombu.binding.celery"
110.106.106.220:5000[5]> select 6
OK
110.106.106.220:5000[6]> keys *
1) "celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a"
2) "celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2"
110.106.106.220:5000[6]> get celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"d64def11-6b77-443f-84c2-0cbd850972f2\", \"children\": []}"
110.106.106.220:5000[6]> get celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"5ae26e89-5d91-496e-8e1c-e0504fbbd39a\", \"children\": []}"

原文地址:https://www.cnblogs.com/yangjian319/p/9089167.html

时间: 2024-10-07 06:27:28

python多进程(三)的相关文章

Python多进程(1)——subprocess与Popen()

Python多进程方面涉及的模块主要包括: subprocess:可以在当前程序中执行其他程序或命令: mmap:提供一种基于内存的进程间通信机制: multiprocessing:提供支持多处理器技术的多进程编程接口,并且接口的设计最大程度地保持了和threading模块的一致,便于理解和使用. 本文主要介绍 subprocess 模块及其提供的 Popen 类,以及如何使用该构造器在一个进程中创建新的子进程.此外,还会简要介绍 subprocess 模块提供的其他方法与属性,这些功能上虽然没

python多进程multiprocessing Pool相关问题

python多进程想必大部分人都用到过,可以充分利用多核CPU让代码效率更高效. 我们看看multiprocessing.pool.Pool.map的官方用法 map(func, iterable[, chunksize]) A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready

Python 多进程多线编程模板

一.Python 多进程多线程原理介绍 1. Python 全局解释器锁GIL a) Python的全局解释器锁GIL是互斥锁,能够防止本机多个线程一次执行Python字节码:由于CPython的内存管理在线程级别是不安全的(内存泄露),所以这个全局解释器锁是必须的.每个Python进程只能申请使用一个GIL锁,因此Python的多线程虽然是并发的但不能并行处理.Python的解释器每次只能执行一个线程,待GIL锁释放后再执行下一个线程,这样线程轮流被执行. b) Python2.x里,GIL的

Python 多进程实战 & 回调函数理解与实战

这篇博文主要讲下笔者在工作中Python多进程的实战运用和回调函数的理解和运用. 多进程实战 实战一.批量文件下载 从一个文件中按行读取 url ,根据 url 下载文件到指定位置,用多进程实现. #!/usr/local/python27/bin/python2.7 from multiprocessing import Process,Pool import os,time,random,sys import urllib # 文件下载函数 def filedown(url,file):  

Python多进程使用

[Python之旅]第六篇(六):Python多进程使用 香飘叶子 2016-05-10 10:57:50 浏览190 评论0 python 多进程 多进程通信 摘要:   关于进程与线程的对比,下面的解释非常好的说明了这两者的区别:     这里主要说明关于Python多进程的下面几点: 1 2 3 4 5 6 7 1.多进程的使用方法 2.进程间的通信之multiprocessing.Manager()使用 3.Python进程池 ... 关于进程与线程的对比,下面的解释非常好的说明了这两者

Python进阶(三十五)-Fiddler命令行和HTTP断点调试

Python进阶(三十五)-Fiddler命令行和HTTP断点调试 一. Fiddler内置命令 ??上一节(使用Fiddler进行抓包分析)中,介绍到,在web session(与我们通常所说的session不是同一个概念,这里的每条HTTP请求都称为一个session).界面中能够看到Fiddler抓取的全部HTTP请求.而为了更加方便的管理全部的session, Fiddler提供了一系列内置的函数用于筛选和操作这些session(习惯命令行操作Linux的童鞋应该能够感受到这会有多么方便

Python进阶(三十四)-Python3多线程解读

Python进阶(三十四)-Python3多线程解读 线程讲解 ??多线程类似于同时执行多个不同程序,多线程运行有如下优点: 使用线程可以把占据长时间的程序中的任务放到后台去处理. 用户界面可以更加吸引人,这样比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度. 程序的运行速度可能加快. 在一些等待的任务实现上如用户输入.文件读写和网络收发数据等,线程就比较有用了.在这种情况下我们可以释放一些珍贵的资源如内存占用等等. ??线程在执行过程中与进程还是有区别的.每个独立

Python多进程并发(multiprocessing)用法实例详解

http://www.jb51.net/article/67116.htm 本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心.Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,

Python多进程相关的坑

Python的multiprocessing模块实现了多进程功能,但官方文档上只有一些比较简单的用法,主要是使用函数作为process的target,而如何在class中使用多进程并没有多讲解.google出两篇比较详细的文章,建议从它们入门: https://pymotw.com/2/multiprocessing/basics.html https://pymotw.com/2/multiprocessing/communication.html 下面记录一下自己这周在python多进程上碰

【Python之旅】第六篇(六):Python多进程使用

关于进程与线程的对比,下面的解释非常好的说明了这两者的区别: 这里主要说明关于Python多进程的下面几点: 1.多进程的使用方法 2.进程间的通信 3.Python进程池 (1)比较简单的例子 (2)多个进程多次并发的情况 (3)验证apply.async方法是非阻塞的 (4)验证apply.async中的get()方法是阻塞的 1.多进程的使用方法 直接给出下面程序代码及注释: from multiprocessing import Process    #从多进程模块中导入Process