下面的类可以创建进程池,可以吧各种数据处理任务都提交给进程池。进程池提供的功能有点类似于列表解析和功能性编程操作(如映射-规约)提供的功能。
Pool( [ numprocess [, initializer [, initargs] ] ] )
创建工作进程池。
numprocess是要创建的进程数。如果省略此参数,将使用cpu_count()的值。【这里简单介绍一下:
from multiprocessing import cpu_count
print(cpu_count()) #获得电脑的CPU的个数
】。
initializer是每个工作进程启动时要执行的可调用对象。initargs是要传递给initializer的参数元组。initializer默认为None。
Pool类的实例p支持一下操作:
p.apply(func [, args[, kwargs] ] )
在一个池工作进程中执行函数(*args,**kwargs),然后返回结果。这里要强调一点:此操作并不会在所有池工作进程中并行执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()函数。
p.apply_async( func [, args [, kwargs [, callback] ] ] )
在一个池工作进程中异步地执行函数(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,稍后可用于获得最终结果。callback禁止只习惯任何阻塞操作,否则将阻塞接收其他异步操作中的结果。
p.close()
关闭进程池,防止进行进一步操作。如果所有操作持续挂起,他们将在工作进程终止之前完成。
p.join()
等待所有工作进程退出。此方法只能在close()或terminate()方法之后调用。
p.imap(func, iterable [, chunksize] )
map()函数的版本之一,返回迭代器而非结果列表。
p.imap_unordered(func, iterable [,chunksize] )
同imap()函数,但从工作进程接收结果时,返回结果的次序时任意的。
p.map(func, iterable [, chunksize] )
将可调用对象func应用给iterable中的所有项目,然后以列表的形式返回结果。通过将iterable划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize制定每块中的项目数。
如果数据量较大,可以增大chunksize的值来提升性能。
p.map_async( func , iterable [, chunksize [, callback] ] )
同map()函数,但结果的返回时异步地。如果提供callable参数,当结果变为可用时,它将与结果一起被调用。
p.terminate()
立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾收集,将自动调用此函数。
方法apply_async()和map_async()的返回值是AsyncResult实例。AsyncResult实例具有以下方法。
a.get( [timeout] )
返回结果,如果有必要则等待结果到达。timeout是可选的超时。如果结果在制定时间内没有到达,将引发multuprocessing.TimeoutError异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
a.ready()
如果调用完成,返回True
a.sucessful()
如果调用完成且没有引发异常,返回True。如果在结果就绪之前调用此方法,将引发AssertionError异常。
a.wait( [ timeout] )
等待结果变为可用。timeout是可选的超时。
下面的例子说明如何使用进程池构建字典,将整个目录中文件的文件名映射为SHA512摘要值:
import multiprocessing
import os
import hashlib
#Some parameters you can tweek
BUFSIZE=8192 #读取缓冲区大小
POOLSIZE=4
def compute_digest(filename):
try:
f=open(filename,"rb")
except IOError:
return None
digest=hashlib.sha512()
while True:
chunk=f.read(BUFSIZE)
if not chunk:break
digest.update(chunk)
f.close()
return filename,digest.digest()
def build_digest_map(topdir):
digest_pool=multiprocessing.Pool(4)
allfiles=(os.path.join(path,name)
for path,dirs,files in os.walk(topdir)
for name in files)
digest_map=dict(digest_pool.imap_unordered(compute_digest,allfiles,20))
digest_pool.close()
return digest_map
if __name__=="__main__":
digest_map=build_digest_map("F:\WaterFlow")
print len(digest_map)
在这个例子中,使用生成器表达式指定一个目录树中所有文件的路径名称序列。然后使用imap_unordered()函数将这个序列分割并传递给进程池。每个池工作进程使用compute_digest()函数为它的文件计算SHA512摘要值。将结果返回给生成器,然后收集到python字典中。
要记住,只有充分利用了池工作进程才能够使额外的通信开销变得有价值,使用进程池才有意义。一般而言,对于简单的计算(如两个数相加),使用进程池是没有意义的。
版权声明:本文为博主原创文章,未经博主允许不得转载。