python类库32[多进程通信Queue+Pipe+Value+Array]

多进程通信

queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。

1)Queue & JoinableQueue

queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。

multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。

task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。

join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。

代码:

import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print (‘%s: Exiting‘ % proc_name)
                self.task_queue.task_done()
                break
            print (‘%s: %s‘ % (proc_name, next_task))
            answer = next_task() # __call__()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return ‘%s * %s = %s‘ % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return ‘%s * %s‘ % (self.a, self.b)

if __name__ == ‘__main__‘:
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count()
    print (‘Creating %d consumers‘ % num_consumers)
    consumers = [ Consumer(tasks, results)
                  for i in range(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

# Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print (‘Result:‘, result)
        num_jobs -= 1

注意小技巧: 使用None来表示task处理完毕。

运行结果:

2) pipe

pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。

代码:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello‘])
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    p.join()
    print(parent_conn.recv())   # prints "[42, None, ‘hello‘]"

3)Value + Array

Value + Array 是python中共享内存 映射文件的方法,速度比较快。

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = n.value + 1
    for i in range(len(a)):
        a[i] = a[i] * 10

if __name__ == ‘__main__‘:
    num = Value(‘i‘, 1)
    arr = Array(‘i‘, range(10))

p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

print(num.value)
    print(arr[:])
    
    p2 = Process(target=f, args=(num, arr))
    p2.start()
    p2.join()

print(num.value)
    print(arr[:])

# the output is :
# 2
# [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
# 3
# [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

参考: 
The Python Standard Library By Example

http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

完!

python类库32[多进程通信Queue+Pipe+Value+Array]

时间: 2024-12-21 20:07:12

python类库32[多进程通信Queue+Pipe+Value+Array]的相关文章

python类库32[多进程同步Lock+Semaphore+Event]

python类库32[多进程同步Lock+Semaphore+Event] 同步的方法基本与多线程相同. 1) Lock 当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突. import multiprocessingimport sys def worker_with(lock, f):    with lock:        fs = open(f,"a+")        fs.write('Lock acquired via with\n')        f

python类库32[序列化和反序列化之pickle]

一 pickle pickle模块用来实现python对象的序列化和反序列化.通常地pickle将python对象序列化为二进制流或文件. python对象与文件之间的序列化和反序列化: pickle.dump() pickle.load() 如果要实现python对象和字符串间的序列化和反序列化,则使用: pickle.dumps() pickle.loads() 可以被序列化的类型有: * None,True 和 False; * 整数,浮点数,复数; * 字符串,字节流,字节数组; * 包

第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法

内容大纲: 进程之间的通讯 进程队列 管道 进程之间的数据共享 进程池 使用进程池 开启进程 提交任务 获得返回值 回调函数1.进程队列 先进先出 from multiprocessing import Queue import queue q = Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) 1 2 3 from multiprocessing import Queue impor

python学习笔记——多进程中共享内存Value & Array

1 共享内存 基本特点: (1)共享内存是一种最为高效的进程间通信方式,进程可以直接读写内存,而不需要任何数据的拷贝. (2)为了在多个进程间交换信息,内核专门留出了一块内存区,可以由需要访问的进程将其映射到自己的私有地址空间.进程就可以直接读写这一块内存而不需要进行数据的拷贝,从而大大提高效率.(文件映射) (3)由于多个进程共享一段内存,因此也需要依靠某种同步机制. 优缺点: 优点:快速在进程间传递数据 缺点: 数据安全上存在风险,内存中的内容会被其他进程覆盖或 者篡改 注: 经常和同步互斥

关于python multiprocessing进程通信的pipe和queue方式

这两天温故了python 的multiprocessing多进程模块,看到的pipe和queue这两种ipc方式,啥事ipc? ipc就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等. 今个就再把pipe和queue搞搞. #coding:utf-8 import multiprocessing import time def proc1(pipe):     while True:         for i in xrange(10000):            

python 多线程和多进程

多线程与多进程 知识预览 一 进程与线程的概念 二 threading模块 三 multiprocessing模块 四 协程 五 IO模型 回到顶部 一 进程与线程的概念 1.1 进程 考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?另外,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源.你是不是已经想到在程序A读取数据的过程中,

搞定python多线程和多进程

1.1 线程 1.1.1 什么是线程 线程是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务.一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令. 1.1.2 线程的工作方式 假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时恢复到当时读的具体进度.有一个方法就是记下页数.行数与字数这三个数值,这些数值就是exe

Python 多线程与多进程

原文地址:http://www.cnblogs.com/whatisfantasy/p/6440585.html 1 概念梳理: 1.1 线程 1.1.1 什么是线程 线程是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务.一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令. 1.1.2 线程的工作方式 假设你正在读一本书

python学习_多进程

1.基础知识: 计算机的硬件组成: 主板 固化(寄存器,是直接和cpu进行交互的硬件) cpu 中央处理器:计算(数字计算和逻辑计算) 和控制(控制所有的硬件协调工作) 存储 硬盘 内存 输入设备 键盘 鼠标 话筒 输出设备 显示器 音响 打印机 早期的计算机是以计算为核心的,现在的计算机是以存储为核心的 操作系统是一个软件 是一个能直接操作硬件的一个软件 操作系统的目标:让用户使用更加轻松,高可用,低耦合,封装了所有硬件的接口,使用户更方便的使用,对于计算机内的所有资源,进行一个合理的调度和分