Python学习笔记18:标准库之多进程(multiprocessing包)

我们能够使用subprocess包来创建子进程。但这个包有两个非常大的局限性:

1) 我们总是让subprocess执行外部的程序,而不是执行一个Python脚本内部编写的函数。

2) 进程间仅仅通过管道进行文本交流。

以上限制了我们将subprocess包应用到更广泛的多进程任务。

这种比較实际是不公平的,由于subprocessing本身就是设计成为一个shell,而不是一个多进程管理包。

一 threading和multiprocessing

multiprocessing包是Python中的多进程管理包。

与threading.Thread类似。它能够利用multiprocessing.Process对象来创建一个进程。

该进程能够执行在Python程序内部编写的函数。

该Process对象与Thread对象的使用方法同样,也有start(), run(), join()的方法。

此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,通过參数传递给各个进程),用以同步进程。其使用方法与threading包中的同名类一致。

所以。multiprocessing的非常大一部份与threading使用同一套API,仅仅只是换到了多进程的情境。

但在使用这些共享API的时候,我们要注意下面几点:

1)在UNIX平台上,当某个进程终结之后。该进程须要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。

所以。有必要对每一个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,因为仅仅有一个进程。所以不存在此必要性。

2)multiprocessing提供了threading包中没有的IPC(比方Pipe和Queue),效率上更高。

应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由于它们占领的不是用户进程的资源)。

3)多进程应该避免共享资源。

在多线程中,我们能够比較easy地共享资源,比方使用全局变量或者传递參数。

在多进程情况下。因为每一个进程有自己独立的内存空间。以上方法并不合适。此时我们能够通过共享内存和Manager的方法来共享资源。

但这样做提高了程序的复杂度,并由于同步的须要而减少了程序的效率。Process.PID中保存有PID,假设进程还没有start()。则PID为None。

我们能够从以下的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。

但问题是,全部的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。

使用Lock同步,在一个任务输出完毕之后,再同意还有一个任务输出,能够避免多个任务同一时候向终端输出。

# Similarity and difference of multi thread vs. multi process

import os
import threading
import multiprocessing

# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()

# Main
print(‘Main:‘,os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=(‘thread‘,lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=(‘process‘,lock))
    process.start()
    record.append(process)

for process in record:
    process.join()

全部Thread的PID都与主程序同样,而每一个Process都有一个不同的PID。

二 Pipe和Queue

管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue能够用来传送常见的对象。

1) Pipe能够是单向(half-duplex)。也能够是双向(duplex)。

我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默觉得双向)。

一个进程从PIPE一端输入对象,然后被PIPE还有一端的进程接收,单向管道仅仅同意管道一端的进程输入,而双向管道则同意从两端输入。

以下的程序展示了Pipe的使用:

# Multiprocessing with Pipe

import multiprocessing as mul

def proc1(pipe):
    pipe.send(‘hello‘)
    print(‘proc1 rec:‘,pipe.recv())

def proc2(pipe):
    print(‘proc2 rec:‘,pipe.recv())
    pipe.send(‘hello, too‘)

# Build a pipe
pipe = mul.Pipe()

# Pass an end of the pipe to process 1
p1   = mul.Process(target=proc1, args=(pipe[0],))

# Pass the other end of the pipe to process 2
p2   = mul.Process(target=proc2, args=(pipe[1],))

p1.start()
p2.start()
p1.join()
p2.join()

这里的Pipe是双向的。

Pipe对象建立的时候,返回一个含有两个元素的表。每一个元素代表Pipe的一端(Connection对象)。

我们对Pipe的某一端调用send()方法来传送对象,在还有一端使用recv()来接收。

2) Queue与Pipe相类似,都是先进先出的结构。但Queue同意多个进程放入,多个进程从队列取出对象。

Queue使用mutiprocessing.Queue(maxsize)创建。maxsize表示队列中能够存放对象的最大数量。

以下的程序展示了Queue的使用:

import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + ‘(put):‘ + str(time.time())
    queue.put(info)

# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + ‘(get):‘ + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)

# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)

# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)

for p in record1:
    p.join()

queue.close()  # No more object will come, close the queue

for p in record2:
    p.join()

一些进程使用put()在Queue中放入字符串,这个字符串中包括PID和时间。

还有一些进程从Queue中取出,并打印自己的PID以及get()的字符串。

三 进程池

进程池 (Process Pool)能够创建多个进程。这些进程就像是随时待命的士兵,准备运行任务(程序)。

一个进程池中能够容纳多个待命的士兵。

比方以下的程序:

import multiprocessing as mul

def f(x):
    return x**2

pool = mul.Pool(5)
rel  = pool.map(f,[1,2,3,4,5,6,7,8,9,10])
print(rel)

我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool执行的每一个进程都执行f()函数。

我们利用map()方法,将f()函数作用到表的每一个元素上。这与built-in的map()函数类似。仅仅是这里用5个进程并行处理。

假设进程执行结束后,还有须要处理的元素。那么的进程会被用于又一次执行f()函数。除了map()方法外。Pool还有以下的经常用法。

1)apply_async(func,args)从进程池中取出一个进程运行func。args为func的參数。

它将返回一个AsyncResult的对象。你能够对该对象调用get()方法以获得结果。

2)close()进程池不再创建新的进程

3)join()wait进程池中的所有进程。

必须对Pool先调用close()方法才干join。

四 共享资源

多进程共享资源必定会带来进程间相互竞争。而这样的竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。

但假设须要,我们依旧能够通过共享内存和Manager对象这么做。

1 共享内存

依据共享内存(shared memory)的原理,这里给出用Python实现的样例:

import multiprocessing

def f(n, a):
    n.value   = 3.14
    a[0]      = 5

num   = multiprocessing.Value(‘d‘, 0.0)
arr   = multiprocessing.Array(‘i‘, range(10))

p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()

print num.value
print arr[:]

这里我们实际上仅仅有主进程和Process对象代表的进程。

我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。

而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们改动了Value和Array对象。

回到主程序。打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。

2 Manager

Manager对象类似于服务器与客户之间的通信 (server-client),与我们在Internet上的活动非常类似。

我们用一个进程作为server,建立Manager来真正存放资源。其他的进程能够通过參数传递或者依据地址来訪问Manager,建立连接后。操作server上的资源。

在防火墙同意的情况下,我们全然能够将Manager运用于多计算机。从而模仿了一个真实的网络情境。

以下的样例中,我们对Manager的使用类似于shared memory。但能够共享更丰富的对象类型。

import multiprocessing

def f(x, arr, l):
    x.value = 3.14
    arr[0] = 5
    l.append(‘Hello‘)

server = multiprocessing.Manager()
x    = server.Value(‘d‘, 0.0)
arr  = server.Array(‘i‘, range(10))
l    = server.list()

proc = multiprocessing.Process(target=f, args=(x, arr, l))
proc.start()
proc.join()

print(x.value)
print(arr)
print(l)

Manager利用list()方法提供了表的共享方式。

实际上你能够利用dict()来共享词典,Lock()来共享threading.Lock(注意,我们共享的是threading.Lock。而不是进程的mutiprocessing.Lock。

后者本身已经实现了进程共享)等。

这样Manager就同意我们共享很多其它样的对象。

时间: 2024-12-18 14:01:41

Python学习笔记18:标准库之多进程(multiprocessing包)的相关文章

C++ Primer 学习笔记_7_标准库类型(续1) -- vector类型

 标准库类型(二) --vector类型 引子: vector是同一种类型的对象的集合,每个对象都有一个对应的整数索引值.和string对象一样,标准库将负责管理与存储元素相关的内存. 我们将vector称之为容器,一个容器中的所有对象都必须是同一类型的! [cpp] view plaincopyprint? #include <vector> using std::vector; #include <vector> using std::vector; [模板] vector

C++ Primer 学习笔记_8_标准库类型(续2) -- iterator

 标准库类型(三) --iterator 序言: 迭代器是一种检查容器内元素并遍历容器元素的数据类型. 所有的标准库容器都定义了相应的迭代器类型,而只有少数的容器支持下标操作:因此,现代C++更倾向于使用迭代器而不是下标操作访问容器元素. 正文: 1.容器的iterator类型 每个标准库容器类型都定义了一个名为iterator的成员: [cpp] view plaincopyprint? vector<int>::iterator iter; vector<int>::ite

C++ Primer 学习笔记_9_标准库类型(续3) -- biteset

 标准库类型(四) --biteset 序言: 位是用来保存一组项或条件的yes/no信息[标识]的简洁方法. [cpp] view plaincopyprint? #include <bitset> using std::bitset; #include <bitset> using std::bitset; 正文: 1.bitset对象的定义和初始化 和vector对象不同的是:bitset类型对象的区别在于其长度而不是类型.在定义bitest时,要在尖括号中说明给出他的长

C++ Primer 学习笔记_6_标准库类型 -- 命名空间using与string类型

 标准库类型(一) --命名空间using与string类型 引: 标准库类型是语言组成部分中更基本的哪些数据类型(如:数组.指针)的抽象! C++标准库定义的是高级的抽象数据类型: 1.高级:因为其中反映了更复杂的概念: 2.抽象:因为我们在使用时不需要关心他们是如何表示的,我们只需要知道这些抽象数据类型支持哪些操作就可以了. 正文: 一.命名空间的using声明 1. using std::cin; ::运算符的作用含义是右操作数的名字可以在左操作数的作用域中找到. 格式: [cpp]

Python学习笔记(二十七)多进程 (进程和线程开始)

摘抄:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000 要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识. Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊.普通的函数调用,调用一次,返回一次,但是fork()调

python学习笔记18(UliPad 初体验)

在windows下安装配置Ulipad 由于UliPad 是由wxPython 开发的,所以,需要先安装wxPython . wxPython下载地址: http://www.wxpython.org/download.php#stable Ulipad下载地址: http://files.cnblogs.com/dolphin0520/ulipad.4.1.py27.rar 安装完成之后,打开Ulipad就是如下界面了: (注意:安装成功后可能启动不了Ulipad,关闭有道词典类的软件就OK了

Python学习笔记-模块介绍(三)-模块包和搜索路径

一个python文件就是一个模块,使用独立的命名空间,但实际使用过程中单单用模块来定义python功能显然还不够.因为一个大型的系统几千上万个模块是很正常的事情,如果都聚集在一起显然不好管理并且有命名冲突的可能,因此python中也出现了一个包的概念. 一.python中的包介绍 包是通过使用"点模块名称"创建Python模块命名空间的一种方法.列如,模块名称 A.B 表示一个在名为 A的包下的名为B的子模块.就像使用模块让不同模块的作者无需担心彼此全局变量名称(冲突)一样,点模块名称

流畅python学习笔记第十八章:使用asyncio包处理并发(一)

首先是线程与协程的对比.在文中作者通过一个实例分别采用线程实现和asynchio包实现来比较两者的差别.在多线程的样例中,会用到join的方法,下面来介绍下join方法的使用. 知识点一:当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下(其实就是setDaemon(False)),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束,例子如下:. def run():

python基础教程_学习笔记18:标准库:一些最爱——shelve

标准库:一些最爱 shelve Shelve唯一有趣的函数是open.在调用它的时候(使用文件名作为参数),它会返回一个Shelf对象,可以用它来存储内容.只需要把它当作普通的字典(但是键一定要作为字符串)来操作即可,在完成工作之后,调用它的close方法. 意识到shelve.open函数返回的对象并不是普通的映射是很重要的. >>> import shelve >>> s=shelve.open('a.txt') >>> s['x']=['a','