multiprocessing在python中的高级应用-进程池

下面的类可以创建进程池,可以吧各种数据处理任务都提交给进程池。进程池提供的功能有点类似于列表解析和功能性编程操作(如映射-规约)提供的功能。

Pool( [ numprocess [, initializer [, initargs] ] ] )

创建工作进程池。

numprocess是要创建的进程数。如果省略此参数,将使用cpu_count()的值。【这里简单介绍一下:

from multiprocessing import cpu_count

print(cpu_count()) #获得电脑的CPU的个数

】。

initializer是每个工作进程启动时要执行的可调用对象。initargs是要传递给initializer的参数元组。initializer默认为None。

Pool类的实例p支持一下操作:

p.apply(func [, args[, kwargs] ] )

在一个池工作进程中执行函数(*args,**kwargs),然后返回结果。这里要强调一点:此操作并不会在所有池工作进程中并行执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()函数。

p.apply_async( func [, args [, kwargs [, callback] ] ] )

在一个池工作进程中异步地执行函数(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,稍后可用于获得最终结果。callback禁止只习惯任何阻塞操作,否则将阻塞接收其他异步操作中的结果。

p.close()

关闭进程池,防止进行进一步操作。如果所有操作持续挂起,他们将在工作进程终止之前完成。

p.join()

等待所有工作进程退出。此方法只能在close()或terminate()方法之后调用。

p.imap(func, iterable [, chunksize] )

map()函数的版本之一,返回迭代器而非结果列表。

p.imap_unordered(func, iterable [,chunksize] )

同imap()函数,但从工作进程接收结果时,返回结果的次序时任意的。

p.map(func, iterable [, chunksize] )

将可调用对象func应用给iterable中的所有项目,然后以列表的形式返回结果。通过将iterable划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize制定每块中的项目数。

如果数据量较大,可以增大chunksize的值来提升性能。

p.map_async( func , iterable [, chunksize [, callback] ] )

同map()函数,但结果的返回时异步地。如果提供callable参数,当结果变为可用时,它将与结果一起被调用。

p.terminate()

立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾收集,将自动调用此函数。

方法apply_async()和map_async()的返回值是AsyncResult实例。AsyncResult实例具有以下方法。

a.get( [timeout] )

返回结果,如果有必要则等待结果到达。timeout是可选的超时。如果结果在制定时间内没有到达,将引发multuprocessing.TimeoutError异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。

a.ready()

如果调用完成,返回True

a.sucessful()

如果调用完成且没有引发异常,返回True。如果在结果就绪之前调用此方法,将引发AssertionError异常。

a.wait( [ timeout] )

等待结果变为可用。timeout是可选的超时。

下面的例子说明如何使用进程池构建字典,将整个目录中文件的文件名映射为SHA512摘要值:

import multiprocessing
import os
import hashlib
#Some parameters you can tweek
BUFSIZE=8192  #读取缓冲区大小
POOLSIZE=4
def compute_digest(filename):
    try:
        f=open(filename,"rb")
    except IOError:
        return None
    digest=hashlib.sha512()
    while True:
        chunk=f.read(BUFSIZE)
        if not chunk:break
        digest.update(chunk)
    f.close()
    return filename,digest.digest()
def build_digest_map(topdir):
    digest_pool=multiprocessing.Pool(4)
    allfiles=(os.path.join(path,name)
              for path,dirs,files in os.walk(topdir)
              for name in files)

    digest_map=dict(digest_pool.imap_unordered(compute_digest,allfiles,20))
    digest_pool.close()
    return digest_map

if __name__=="__main__":
    digest_map=build_digest_map("F:\WaterFlow")
    print len(digest_map)

在这个例子中,使用生成器表达式指定一个目录树中所有文件的路径名称序列。然后使用imap_unordered()函数将这个序列分割并传递给进程池。每个池工作进程使用compute_digest()函数为它的文件计算SHA512摘要值。将结果返回给生成器,然后收集到python字典中。

要记住,只有充分利用了池工作进程才能够使额外的通信开销变得有价值,使用进程池才有意义。一般而言,对于简单的计算(如两个数相加),使用进程池是没有意义的。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-06 07:37:59

multiprocessing在python中的高级应用-进程池的相关文章

multiprocessing在python中的高级应用-进程

本篇主要讲解multiprocessing中的重要模块-进程. Process([group [,target [,name [,args [,kwargs]]]]]) 这个类表示运行在一个子进程中的任务,应该使用关键字参数来指定构造函数中的参数.target是当前进程启动时执行的可调用对象,args是传递给target的位置参数的元组,而kwargs是传递给target的关键字参数的字典.如果省略args和kwargs参数,将不带参数调用target.name是为进程指定描述性名称额字符串.g

multiprocessing在python中的高级应用-IPC 之 Pipe

作为使用队列的另一种形式,还可以使用管道在进程回见执行消息传递. Pipe( [ duplex]) 在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1和conn2是表示管道两端的Connection对象.默认情况下,管道是双向的.如果将duplex置为False,conn1只能用于接收,而conn2只能用于发送.必须在创建和启动使用管道的Process对象之前调用Pipe()方法. Pipe()方法返回的Connection对象的实例c具有以下方法和属性. c.clos

multiprocessing在python中的高级应用-IPC 之 Queue

multiprocessing模块支持进程间通信的两种主要形式:管道和队列.这两种方法都使用了消息传递实现的,但队列接口有意模仿线程程序中常见的队列用法. 有关Queue编程实例可以查看微博内容. Queue([maxsize]) 创建共享的进程队列.maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制.底层队列使用管道和锁定实现.另外,还需要运行支持线程以便队列中的数据传输到底层管道中. Queue的实例q具有以下方法: q.cancel_join_thread() 不会再进程退

multiprocessing在python中的高级应用-共享数据与同步

通常,进程之间彼此是完全孤立的,唯一的通信方式是队列或管道.但可以使用两个对象来表示共享数据.其实,这些对象使用了共享内存(通过mmap模块)使访问多个进程成为可能. Value( typecode, arg1, - argN, lock ) 在共享内容中常见ctypes对象.typecode要么是包含array模块使用的相同类型代码(如'i','d'等)的字符串,要么是来自ctypes模块的类型对象(如ctypes.c_int.ctypes.c_double等).所有额外的位置参数arg1,

Python中的高级数据结构(转)

add by zhj: Python中的高级数据结构 数据结构 数据结构的概念很好理解,就是用来将数据组织在一起的结构.换句话说,数据结构是用来存储一系列关联数据的东西.在Python中有四种内建的数据 结构,分别是List.Tuple.Dictionary以及Set.大部分的应用程序不需要其他类型的数据结构,但若是真需要也有很多高级数据结构可供 选择,例如Collection.Array.Heapq.Bisect.Weakref.Copy以及Pprint.本文将介绍这些数据结构的用法,看 看它

Python中的高级数据结构详解

这篇文章主要介绍了Python中的高级数据结构详解,本文讲解了Collection.Array.Heapq.Bisect.Weakref.Copy以及Pprint这些数据结构的用法,需要的朋友可以参考下 数据结构 数据结构的概念很好理解,就是用来将数据组织在一起的结构.换句话说,数据结构是用来存储一系列关联数据的东西.在Python中有四种内建的数据结构,分别是List.Tuple.Dictionary以及Set.大部分的应用程序不需要其他类型的数据结构,但若是真需要也有很多高级数据结构可供选择

python中的线程和进程

进程与线程的历史 我们都知道计算机是由硬件和软件组成的.硬件中的CPU是计算机的核心,它承担计算机的所有任务. 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配.任务的调度. 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等. 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专门的管理和控制执行程序的数据结构——进程控制块. 进程就是一个程序在一个数据集上的一次动态执行过程. 进程一般由程序.数据集.进程控

Python中的线程与进程

进程与线程 在多任务处理中,每一个任务都有自己的进程,一个任务会有很多子任务,这些在进程中开启线程来执行这些子任务.一般来说,可以将独立调度.分配的基本单元作为线程运行,而进程是资源拥有的基本单位. python支持多进程multiprocessing,以及多线程threading. 多进程 os.fork()函数可以开启一个进程.该函数会返回两次值,分别在父进程中返回子进程的ID,而在子进程中永远返回0. os.getpid()函数可以返回进程的ID.os.getppid()则可以返回父进程的

Python开发基础--- 进程间通信、进程池、协程

进程间通信 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 进程队列queue 不同于线程queue,进程queue的生成是用multiprocessing模块生成的. 在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间. 示例1: 1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]