Python使用进程池管理进程和进程间通信

与线程池类似的是,如果程序需要启动多个进程,也可以使用进程池来管理进程。程序可以通过 multiprocessing 模块的 Pool() 函数创建进程池,进程池实际上是 multiprocessing.pool.Pool 类。

进程池具有如下常用方法:

1.apply(func[, args[, kwds]]):将 func 函数提交给进程池处理。其中 args 代表传给 func 的位置参数,kwds 代表传给 func 的关键字参数。该方法会被阻塞直到 func 函数执行完成。

2.apply_async(func[, args[, kwds[, callback[, error_callback]]]]):这是 apply() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。

3.map(func, iterable[, chunksize]):类似于 Python 的 map() 全局函数,只不过此处使用新进程对 iterable 的每一个元素执行 func 函数。

4.map_async(func, iterable[, chunksize[, callback[, error_callback]]]):这是 map() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。

5.imap(func, iterable[, chunksize]):这是 map() 方法的延迟版本。

6.imap_unordered(func, iterable[, chunksize]):功能类似于 imap() 方法,但该方法不能保证所生成的结果(包含多个元素)与原 iterable 中的元素顺序一致。

7.starmap(func, iterable[,chunksize]):功能类似于 map() 方法,但该方法要求 iterable 的元素也是 iterable 对象,程序会将每一个元素解包之后作为 func 函数的参数。

8.close():关闭进程池。在调用该方法之后,该进程池不能再接收新任务,它会把当前进程池中的所有任务执行完成后再关闭自己。

9.terminate():立即中止进程池。

10.join():等待所有进程完成。

如果程序只是想将任务提交给进程池执行,则可调用 apply() 或 apply_async() 方法;

如果程序需要使用指定函数将 iterable 转换成其他 iterable,则可使用 map() 或 imap() 方法

使用 apply_async() 方法启动进程

import multiprocessing
import time
import os
def action(name=‘default‘):
    print(‘(%s)进程正在执行,参数为: %s‘ % (os.getpid(), name))
    time.sleep(3)
if __name__ == ‘__main__‘:
    # 创建包含4条进程的进程池
    pool = multiprocessing.Pool(processes=4)
    # 将action分3次提交给进程池
    pool.apply_async(action)
    pool.apply_async(action, args=(‘位置参数‘, ))
    pool.apply_async(action, kwds={‘name‘: ‘关键字参数‘})
    pool.close()
    pool.join()

从上面程序可以看出,进程池同样实现了上下文管理协议,因此程序可以使用 with 子句来管理进程池,这样就可以避免程序主动关闭进程池。

使用 map() 方法来启动进程

import multiprocessing
import time
import os
# 定义一个准备作为进程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(‘(%s)进程正在执行: %d‘ % (os.getpid(), i))
        my_sum += i
    return my_sum
if __name__ == ‘__main__‘:
    # 创建一个包含4条进程的进程池
    with multiprocessing.Pool(processes=4) as pool:
        # 使用进程执行map计算
        # 后面元组有3个元素,因此程序启动3条进程来执行action函数
        results = pool.map(action, (50, 100, 150))
        print(‘--------------‘)
        for r in results:
            print(r)

进程通信

Python为进程通信提供了两种机制:

1.Queue:一个进程向 Queue 中放入数据,另一个进程从 Queue 中读取数据。

2.Pipe:Pipe 代表连接两个进程的管道。程序在调用 Pipe() 函数时会产生两个连接端,分别交给通信的两个进程,接下来进程既可从该连接端读取数据,也可向该连接端写入数据。

使用Queue实现进程间通信

multiprocessing 模块下的 Queue 和 queue 模块下的 Queue 基本类似,它们都提供了 qsize()、empty()、full()、put()、put_nowait()、get()、get_nowait() 等方法。区别只是 multiprocessing 模块下的 Queue 为进程提供服务,而 queue 模块下的 Queue 为线程提供服务。

下面程序使用 Queue 来实现进程之间的通信:

 1 import multiprocessing
 2
 3 def f(q):
 4     print(‘(%s) 进程开始放入数据...‘ % multiprocessing.current_process().pid)
 5     q.put(‘Python‘)
 6 if __name__ == ‘__main__‘:
 7     # 创建进程通信的Queue
 8     q = multiprocessing.Queue()
 9     # 创建子进程
10     p = multiprocessing.Process(target=f, args=(q,))
11     # 启动子进程
12     p.start()
13     print(‘(%s) 进程开始取出数据...‘ % multiprocessing.current_process().pid)
14     # 取出数据
15     print(q.get())  # Python
16     p.join()

上面程序中,第 5 行代码(子进程)负责向 Queue 中放入一个数据,第 15 行代码(父进程)负责从 Queue 中读取一个数据,

这样就实现了父、子两个进程之间的通信。

使用Pipe实现进程间通信

使用 Pipe 实现进程通信,程序会调用 multiprocessing.Pipe() 函数来创建一个管道,该函数会返回两个 PipeConnection 对象,代表管道的两个连接端(一个管道有两个连接端,分别用于连接通信的两个进程)。

PipeConnection 对象包含如下常用方法:

1.send(obj):发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必须是可 picklable 的(Python 的序列化机制),

如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常。

2.recv():接收另一端通过 send() 方法发送过来的数据。

3.fileno():关于连接所使用的文件描述器。

4.close():关闭连接。

5.poll([timeout]):返回连接中是否还有数据可以读取。

6.send_bytes(buffer[, offset[, size]]):发送字节数据。如果没有指定 offset、size 参数,则默认发送 buffer 字节串的全部数据;如果指定了 offset 和 size 参数,则只发送 buffer 字节串中从 offset 开始、长度为 size 的字节数据。通过该方法发送的数据,应该使用 recv_bytes() 或 recv_bytes_into 方法接收。

7.recv_bytes([maxlength]):接收通过 send_bytes() 方法发迭的数据,maxlength 指定最多接收的字节数。该方法返回接收到的字节数据。

8.recv_bytes_into(buffer[, offset]):功能与 recv_bytes() 方法类似,只是该方法将接收到的数据放在 buffer 中。

使用 Pipe 来实现两个进程之间的通信:

 1 import multiprocessing
 2 def f(conn):
 3     print(‘(%s) 进程开始发送数据...‘ % multiprocessing.current_process().pid)
 4     # 使用conn发送数据
 5     conn.send(‘Python‘)
 6 if __name__ == ‘__main__‘:
 7     # 创建Pipe,该函数返回两个PipeConnection对象
 8     parent_conn, child_conn = multiprocessing.Pipe()
 9     # 创建子进程
10     p = multiprocessing.Process(target=f, args=(child_conn, ))
11     # 启动子进程
12     p.start()
13     print(‘(%s) 进程开始接收数据...‘ % multiprocessing.current_process().pid)
14     # 通过conn读取数据
15     print(parent_conn.recv())  # Python
16     p.join()

上面程序中第 6 行代码(子进程)通过 PipeConnection 向管道发送数据,数据将会被发送给管道另一端的父进程。

第 16 行代码(父进程)通过 PipeConnection 从管道读取数据,程序就可以读取到另一端子进程写入的数据,这样就实现了父、子两个进程之间的通信。

原文地址:https://www.cnblogs.com/jzxs/p/11428892.html

时间: 2024-10-10 20:37:27

Python使用进程池管理进程和进程间通信的相关文章

Python 多进程进程池Queue进程通信

from multiprocessing import Pool,Manager import time def hanshu(queue,a): n = 1 while n<50: # print('\r正在工作%d'%a,end='') n+=1 # [步骤3]往队列中发送一条消息 queue.put(a) time.sleep(2) def main(): print('执行main函数') for i in range(0,10): po.apply_async(hanshu,args=

Python开发基础--- 进程间通信、进程池、协程

进程间通信 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 进程队列queue 不同于线程queue,进程queue的生成是用multiprocessing模块生成的. 在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间. 示例1: 1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]

python基础之进程间通信、进程池、协程

进程间通信 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 进程队列queue 不同于线程queue,进程queue的生成是用multiprocessing模块生成的. 在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间. 示例1: 1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]

Python、进程间通信、进程池、协程

进程间通信 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 进程队列queue 不同于线程queue,进程queue的生成是用multiprocessing模块生成的. 在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间. 示例1: 1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]

python进程池剖析(二)

之前文章中介绍了python中multiprocessing模块中自带的进程池Pool,并对进程池中的数据结构和各个线程之间的合作关系进行了简单分析,这节来看下客户端如何对向进程池分配任务,并获取结果的. 我们知道,当进程池中任务队列非空时,才会触发worker进程去工作,那么如何向进程池中的任务队列中添加任务呢,进程池类有两组关键方法来创建任务,分别是apply/apply_async和map/map_async,实际上进程池类的apply和map方法与python内建的两个同名方法类似,ap

multiprocessing在python中的高级应用-进程池

下面的类可以创建进程池,可以吧各种数据处理任务都提交给进程池.进程池提供的功能有点类似于列表解析和功能性编程操作(如映射-规约)提供的功能. Pool( [ numprocess [, initializer [, initargs] ] ] ) 创建工作进程池. numprocess是要创建的进程数.如果省略此参数,将使用cpu_count()的值.[这里简单介绍一下: from multiprocessing import cpu_count print(cpu_count()) #获得电脑

python:多进程,多进程队列,多进程管道,Manager,进程锁,进程池

#!usr/bin/env python# -*- coding:utf-8 -*- __author__ = "Samson" import multiprocessingimport time def run(name): time.sleep(2) print("process start...%s" % name)if __name__ == "__main__": for i in range(10): p = multiprocess

Python程序中的进程操作-进程池(multiprocess.Pool)

Python程序中的进程操作-进程池(multiprocess.Pool) 一.进程池 为什么要有进程池?进程池的概念. 在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务.那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间.第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率.因此我们不能无限制的根据任务开启或者结束进程.那么我们要怎么做呢? 在这里,要给大家介

Python并发编程—进程池

进程池实现 1.必要性[1] 进程的创建和销毁过程消耗的资源较多[2] 当任务量众多,每个任务在很短时间内完成时,需要频繁的创建和销毁进程.此时对计算机压力较大[3] 进程池技术很好的解决了以上问题. 2.原理 创建一定数量的进程来处理事件,事件处理完进 程不退出而是继续处理其他事件,直到所有事件全都处理完毕统一销毁.增加进程的重复利用,降低资源消耗. 3.进程池实现 [1] 创建进程池对象,放入适当的进程 from multiprocessing import Pool Pool(proces