python多进程那点事儿【multiprocessing库】

前言:项目中有个需求需要对产品的日志处理,按照产品中日志的某些字段,对日志进行再次划分。比如产品的日志中含有字段id,tag=1,现在需要把tag是基数的放到一个文件中,tag是偶数的放入一个文件中。这就涉及到多个文件的读写操作,一个文件一个文件读取写入那时间太久了,公司配备的单机,跑了半个多小时,光标还是一直在闪闪闪【你懂得】。没办法了,还是用多进程跑吧。这就得对python中的多进程从新回顾一遍了。

Q1:为什么不用多线程呢?

A1:这个就需要了解python多线程的实现原理了,通过在其解释器层面施加一个全局锁来保证同一时刻只有一个线程可以拥有锁,执行相应的python字节码。所以虽然冠名是是多线程,但是实质上还是只有一个线程在运行。有时候多线程可能会让程序得不到提高反而降低,因为线程之间需要竞争资源。所以很多人也说,如果想真正的同一时刻执行多个任务的话,就需要使用多进程。

1.使用multiprocessing.Process

multiprocessing.Process最常见的使用就是:

p = multiprocessing.Process(target = 多线程执行函数名, args = 函数参数元组形式)
p.start()
p.join()

注意使用多进程时候一定要使用join对子进程的状态进行收集,否则在程序运行过程中会出现僵尸进程,对系统性能造成影响。

当然,上面这只有一个进程,你在写的时候可能很顺手就写了

for x in range(10):
p = multiprocessing.Process(target = 多线程执行函数名, args = 函数参数元组形式)
p.start()
p.join()

然后就发现,这个进程貌似是顺序执行的。。。好像没有并发,原因就出现在join的位置上,仔细查看手册,你会发现在join函数下方有一行说明:

Block the calling thread until the process whose join() method is called terminates or until the optional timeout occurs.

意思就是主线程会在join的地方一直等子进程结束。。那么我们多个进程并发执行就要这样写了:

1 p_list = []
2 for x in range(10):
3     p = multiprocessing.Process(target = 多线程执行函数名, args = 函数参数元组形式)
4     p.start()
5     p_list.append(p)
6
7 for p in p_list:
8     p.join()

感觉这样写的代码一点都不优雅,而且以后拓展也很不方便,子进程的数目会随着任务数目的增加而增加,进程得不到重复的利用。

2.使用multiprocessing.Pool

进程池就是上述不方便的完美解决,其一般用法如下:

pool = multiprocessing.Pool(processes=进程数目)
for x in xrange(任务数目):
    pool.apply_async(函数名, 函数参数元组形式)
pool.close() # close函数表明不会再往进程池中加入新任务,一定要在join方法调用之前调用。
pool.join()

上述代码开启了含有一定数目的进程池,只需要往进程池中加入新任务即可,当进程池中已满,其他的任务就等待,直到有任务结束。

注意:除了pool.apply_async方法,还有一个pool.apply方法,只不过pool.apply方法是阻塞的。

还可以使用进程池方法来关注进程执行的结果,pool.apply_asyn函数即返回函数的执行结果,使用get()方法即可得到。

3.多进程共享数据

多个进程之间共享数据也有很多种方法:

1)共享变量

只能使用Value和Array方法:

multiprocessing.Value(typecode_or_type, *args[, lock])
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True) 

关于lock的一段说明:

Return a process-safe wrapper object for a ctypes object which uses lock to synchronize access. If lock is None (the default) then a multiprocessing.RLock object is created automatically.

共享变量只能是一个变量,或者是线性的一组变量,类型也是从typecode_or_type衍生而来的,具体的用法和说明手册上已经讲解的很清楚了。附上手册上的一段代码如下:

 1 from multiprocessing import Process, Lock
 2 from multiprocessing.sharedctypes import Value, Array
 3 from ctypes import Structure, c_double
 4
 5 class Point(Structure):
 6     _fields_ = [(‘x‘, c_double), (‘y‘, c_double)]
 7
 8 def modify(n, x, s, A):
 9     n.value **= 2
10     x.value **= 2
11     s.value = s.value.upper()
12     for a in A:
13         a.x **= 2
14         a.y **= 2
15
16 if __name__ == ‘__main__‘:
17     lock = Lock()
18
19     n = Value(‘i‘, 7)
20     x = Value(c_double, 1.0/3.0, lock=False)
21     s = Array(‘c‘, ‘hello world‘, lock=lock)
22     A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
23
24     p = Process(target=modify, args=(n, x, s, A))
25     p.start()
26     p.join()
27
28     print n.value
29     print x.value
30     print s.value
31     print [(a.x, a.y) for a in A]

结果:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

2)Manager

使用Manager方法时,共享变量的类型会多一些,例如list,dict,Event,Lock,Array,Value...使用Manager方法时需要注意,在操作共享对象时候,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。例如:

d = Manager().dict()
d[0] = []
d[0].append(0) # append方法作用在代理对象上,并不对原始对象生效
print d

输出:{0: []}

而同样意思的一段代码:

d = Manager().dict()
l = []
l.append(0)
d[0] = l # 直接赋值操作,影响原始共享对象
print d

输出:{0: [0]}

3)Queue

队列,顾名思义,就是一组数据,使用put来往队列中存入数据,使用get方法获取数据,当队列满了继续put和队列空了继续get时候会抛出相对应的异常。可以多个进程之间传递数据。

4)Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端,对应两个进程。还可以通过duplex来设定管道是全双工(duplex=True)还是半双工(duplex=False)工作。send和recv方法分别是发送和接受消息的方法,如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

4.多进程同步互斥

1)Lock和Semaphore

这两个就不多讲了,学过操作系统的都知道。Lock限定同一时间仅一个进程访问共享变量,Semaphore则可以限定多个进程同时访问共享变量。

2)Event

使用set和is_set判断事件是否已经发生,决定下一步要执行的动作。

时间: 2024-10-01 02:59:51

python多进程那点事儿【multiprocessing库】的相关文章

Python多进程并发(multiprocessing)用法实例详解

http://www.jb51.net/article/67116.htm 本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心.Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,

python多进程的理解 multiprocessing Process join run

最近看了下多进程. 一种接近底层的实现方法是使用 os.fork()方法,fork出子进程.但是这样做事有局限性的.比如windows的os模块里面没有 fork() 方法. windows:.linux: 另外还有一个模块:subprocess.这个没整过,但从vamei的博客里看到说也同样有局限性. 所以直接说主角吧 --- multiprocessing模块. multiprocessing模块会在windows上时模拟出fork的效果,可以实现跨平台,所以大多数都使用multiproce

Python多进程(multiprocessing)学习总结

简介 multiprocessing模块使用和threading包类似的API接口来产生多进程,multiprocessing包提供本地和远程的并发,通过使用subprocesses(子进程)代替threads(线程)有效的避开了GIL(Global Interpreter Lock).由于这一点,multiprocessing模块允许程序充分的利用多处理器.可以跨平台使用,包括Unix和Windows!----https://docs.python.org/2/library/multipro

python多进程-----multiprocessing包

multiprocessing并非是python的一个模块,而是python中多进程管理的一个包,在学习的时候可以与threading这个模块作类比,正如我们在上一篇转载的文章中所提,python的多线程并不能做到真正的并行处理,只能完成相对的并发处理,那么我们需要的就是python的多进程来完成并行处理,把所有的cpu资源都利用起来.multiprocessing的很大一部分与threading使用同一套API,只不过换到了多进程的环境.这里面要注意,对于多进程来说,win32平台和unix平

Python多进程multiprocessing使用示例

mutilprocess简介 像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. import multiprocessing def worker(num): """thread worker function""" print 'Worker:', num return if __name__ == '__main__': jobs = [] for i

Python多进程池 multiprocessing Pool

1. 背景 由于需要写python程序, 定时.大量发送htttp请求,并对结果进行处理. 参考其他代码有进程池,记录一下. 2. 多进程 vs 多线程 c++程序中,单个模块通常是单进程,会启动几十.上百个线程,充分发挥机器性能.(目前c++11有了std::thread编程多线程很方便,可以参考我之前的博客) shell脚本中,都是多进程后台执行.({ ...} &, 可以参考我之前的博客,实现shell并发处理任务) python脚本有多线程和多进程.由于python全局解锁锁的GIL的存

python多进程multiprocessing Pool相关问题

python多进程想必大部分人都用到过,可以充分利用多核CPU让代码效率更高效. 我们看看multiprocessing.pool.Pool.map的官方用法 map(func, iterable[, chunksize]) A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready

Python多进程multiprocessing(二)

紧接上文 在上文Python多进程multiprocessing(一)中我们介绍了多进程multiprocessing的部分基础操作,在本文中,我们将继续介绍关于多进程的一些知识,比如进程池Pool这个有用的东东.马上开始吧! 使用实例 实例1 import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool(processes=2) res = pool.map(job,range(10))

Python 多进程实战 & 回调函数理解与实战

这篇博文主要讲下笔者在工作中Python多进程的实战运用和回调函数的理解和运用. 多进程实战 实战一.批量文件下载 从一个文件中按行读取 url ,根据 url 下载文件到指定位置,用多进程实现. #!/usr/local/python27/bin/python2.7 from multiprocessing import Process,Pool import os,time,random,sys import urllib # 文件下载函数 def filedown(url,file):