python concurrent.futures

python因为其全局解释器锁GIL而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,IO密集型 vs. 计算密集型。

IO密集型:读取文件,读取网络套接字频繁。

计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。

而concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。

核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。

 第一章 concurrent.futures性能阐述

  • 最大公约数

这个函数是一个计算密集型的函数。

# -*- coding:utf-8 -*-
# 求最大公约数
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

numbers = [
    (1963309, 2265973), (1879675, 2493670), (2030677, 3814172),
    (1551645, 2229620), (1988912, 4736670), (2198964, 7876293)
]

  • 不使用多线程/多进程

import time

start = time.time()
results = list(map(gcd, numbers))
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

Took 2.507 seconds.

消耗时间是:2.507。

  • 多线程ThreadPoolExecutor

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

Took 2.840 seconds.

消耗时间是:2.840。

上面说过gcd是一个计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。

  • 多进程ProcessPoolExecutor

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

Took 1.861 seconds.

消耗时间:1.861。

在两个CPU核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操作:

1)把numbers列表中的每一项输入数据都传给map。

2)用pickle模块对数据进行序列化,将其变成二进制形式。

3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。

4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。

5)引入包含gcd函数的python模块。

6)各个子进程并行的对各自的输入数据进行计算。

7)对运行的结果进行序列化操作,将其转变成字节。

8)将这些字节通过socket复制到主进程之中。

9)主进程对这些字节执行反序列化操作,将其还原成python对象。

10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。

multiprocessing开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。

第二章 concurrent.futures源码分析

  • Executor

可以任务Executor是一个抽象类,提供了如下抽象方法submit,map(上面已经使用过),shutdown。值得一提的是Executor实现了__enter__和__exit__使得其对象可以使用with操作符。关于上下文管理和with操作符详细请参看这篇博客http://www.cnblogs.com/kangoroo/p/7627167.html

ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """
        raise NotImplementedError()

    def map(self, fn, *iterables, **kwargs):
        """Returns a iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        timeout = kwargs.get(‘timeout‘)
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                for future in fs:
                    if timeout is None:
                        yield future.result()
                    else:
                        yield future.result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        """Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Args:
            wait: If True then shutdown will not return until all running
                futures have finished executing and the resources used by the
                executor have been reclaimed.
        """
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

下面我们以线程ProcessPoolExecutor的方式说明其中的各个方法。

  • map
map(self, fn, *iterables, **kwargs)

map方法的实例我们上面已经实现过,值得注意的是,返回的results列表是有序的,顺序和*iterables迭代器的顺序一致。

这里我们使用with操作符,使得当任务执行完成之后,自动执行shutdown函数,而无需编写相关释放代码。

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    results = list(pool.map(gcd, numbers))
print ‘results: %s‘ % results
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

产出结果是:

results: [1, 5, 1, 5, 2, 3]
Took 1.617 seconds.
  • submit
submit(self, fn, *args, **kwargs)

submit方法用于提交一个可并行的方法,submit方法同时返回一个future实例。

future对象标识这个线程/进程异步进行,并在未来的某个时间执行完成。future实例表示线程/进程状态的回调。

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
futures = list()
with ProcessPoolExecutor(max_workers=2) as pool:
    for pair in numbers:
        future = pool.submit(gcd, pair)
        futures.append(future)
print ‘results: %s‘ % [future.result() for future in futures]
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

产出结果是:

results: [1, 5, 1, 5, 2, 3]
Took 2.289 seconds.
  • future

submit函数返回future对象,future提供了跟踪任务执行状态的方法。比如判断任务是否执行中future.running(),判断任务是否执行完成future.done()等等。

as_completed方法传入futures迭代器和timeout两个参数

默认timeout=None,阻塞等待任务执行完成,并返回执行完成的future对象迭代器,迭代器是通过yield实现的。

timeout>0,等待timeout时间,如果timeout时间到仍有任务未能完成,不再执行并抛出异常TimeoutError

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed

start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    futures = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
        print ‘执行中:%s, 已完成:%s‘ % (future.running(), future.done())
    print ‘#### 分界线 ####‘
    for future in as_completed(futures, timeout=2):
        print ‘执行中:%s, 已完成:%s‘ % (future.running(), future.done())
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

  • wait

wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。

使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION

start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    futures = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
        print ‘执行中:%s, 已完成:%s‘ % (future.running(), future.done())
    print ‘#### 分界线 ####‘
    done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED)
    for d in done:
        print ‘执行中:%s, 已完成:%s‘ % (d.running(), d.done())
        print d.result()
end = time.time()
print ‘Took %.3f seconds.‘ % (end - start)

由于设置了ALL_COMPLETED,所以wait等待所有的task执行完成,可以看到6个任务都执行完成了。

执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:False, 已完成:False
执行中:False, 已完成:False
#### 分界线 ####
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
Took 1.518 seconds.

如果我们将配置改为FIRST_COMPLETED,wait会等待直到第一个任务执行完成,返回当时所有执行成功的任务。这里并没有做并发控制。

重跑,结构如下,可以看到执行了2个任务。

执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:False, 已完成:False
执行中:False, 已完成:False
#### 分界线 ####
执行中:False, 已完成:True
执行中:False, 已完成:True
Took 1.517 seconds.

原文地址:https://www.cnblogs.com/ExMan/p/10138721.html

时间: 2024-11-08 10:50:58

python concurrent.futures的相关文章

python | concurrent.futures模块提升数据处理速度

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px ".PingFang SC"; color: #454545 } p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px "Helvetica Neue"; color: #454545; min-height: 14.0px } span.s1 { font: 12.0px "Helvetica Ne

Python并发编程之线程池/进程池--concurrent.futures模块

h2 { color: #fff; background-color: #f7af0d; padding: 3px; margin: 10px 0px } 一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间.但从Python3.2开始,标准库为我们提供了conc

python简单粗暴多线程之concurrent.futures

python在前面写过多线程的库threading: python3多线程趣味详解 但是今天发现一个封装得更加简单暴力的多线程库concurrent.futures: # !/usr/bin/python3.4 # -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor import time def f1(a): time.sleep(2) print(a) return 1 pool=ThreadPool

python全栈开发基础【第二十六篇】(concurrent.futures模块、协程、Greenlet、Gevent)

注意 1.不能无限的开进程,不能无限的开线程最常用的就是开进程池,开线程池.其中回调函数非常重要回调函数其实可以作为一种编程思想,谁好了谁就去掉 2.只要你用并发,就会有锁的问题,但是你不能一直去自己加锁吧那么我们就用QUEUE,这样还解决了自动加锁的问题由Queue延伸出的一个点也非常重要的概念.以后写程序也会用到这个思想.就是生产者与消费者问题 一.Python标准模块--concurrent.futures(并发未来) concurent.future模块需要了解的 1.concurent

python并发模块之concurrent.futures(一)

Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持,他属于上层的封装,对于用户来说,不用在考虑那么多东西了. 官方参考资料:https://pythonhosted.org/futures/ 1.Executor Exectuor是基础模块,这是一个抽象类,其子类分

python并发编程之进程池,线程池concurrent.futures

进程池与线程池 在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多, 这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途, 例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制 Python--concurrent.fu

python并发之concurrent.futures

concurrent:并发 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码.从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持. concurrent.futures基础模块是executor和f

python异步并发模块concurrent.futures入门详解

concurrent.futures模块详解 Executor对象 class concurrent.futures.Executor Executor是一个抽象类,它提供了异步执行调用的方法.它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用. 2.1.1 Executor.submit(fn, *args, **kwargs) fn:需要异步执行的函数 *args, **kwargs:fn参数 2.1.2 Execut

简单实现并发:python concurrent模块

可以使用python 3中的concurrent模块,如果python环境是2.7的话,需要下载https://pypi.python.org/packages/source/f/futures/futures-2.1.6.tar.gz#md5=cfab9ac3cd55d6c7ddd0546a9f22f453 此futures包即可食用concurrent模块. 官方文档:http://pythonhosted.org//futures/ 对于python来说,作为解释型语言,Python的解释