Python3标准库:concurrent.futures管理并发任务池

1. concurrent.futures管理并发任务池

concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池的API是一样的,所以应用只做最小的修改就可以在线程和进程之间顺利地切换。

这个模块提供了两种类型的类与这些池交互。执行器(executor)用来管理工作线程或进程池,future用来管理工作线程或进程计算的结果。要使用一个工作线程或进程池,应用要创建适当的执行器类的一个实例,然后向它提交任务来运行。每个任务启动时,会返回一个Future实例。需要任务的结果时,应用可以使用Future阻塞,直到得到结果。目前已经提供了不同的API,可以很方便地等待任务完成,所以不需要直接管理Future对象。

1.1 利用基本线程池使用map()

ThreadPooLExecutor管理一组工作线程,当这些线程可用于完成更多工作时,可以向它们传入任务。下面的例子使用map()并发地从一个输入迭代器生成一组结果。这个任务使用time.sleep()暂停不同的时间,从而展示不论任务的执行顺序如何,map()总是根据输入按顺序返回值。

from concurrent import futures
import threading
import time

def task(n):
    print(‘{}: sleeping {}‘.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print(‘{}: done with {}‘.format(
        threading.current_thread().name,
        n)
    )
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print(‘main: starting‘)
results = ex.map(task, range(5, 0, -1))
print(‘main: unprocessed results {}‘.format(results))
print(‘main: waiting for real results‘)
real_results = list(results)
print(‘main: results: {}‘.format(real_results))

map()的返回值实际上是一种特殊类型的迭代器,它知道主程序迭代处理时要等待各个响应。

1.2 调度单个任务

除了使用map(),还可以借助submit()利用一个执行器调度单个任务。然后可以使用返回的Future实例等待这个任务的结果。

from concurrent import futures
import threading
import time

def task(n):
    print(‘{}: sleeping {}‘.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print(‘{}: done with {}‘.format(
        threading.current_thread().name,
        n)
    )
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print(‘main: starting‘)
f = ex.submit(task, 5)
print(‘main: future: {}‘.format(f))
print(‘main: waiting for results‘)
result = f.result()
print(‘main: result: {}‘.format(result))
print(‘main: future after result: {}‘.format(f))

任务完成之后,Future的状态会改变,并得到结果。

1.3 按任意顺序等待任务

调用Future的result()方法会阻塞,直到任务完成(可能返回一个值,也可能抛出一个异常)或者撤销。可以使用map()按调度任务的顺序访问多个任务的结果。如果处理结果的顺序不重要,则可以使用as_completed()在每个任务完成时处理它的结果。

from concurrent import futures
import random
import time

def task(n):
    time.sleep(random.random())
    return (n, n / 10)

ex = futures.ThreadPoolExecutor(max_workers=5)
print(‘main: starting‘)

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print(‘main: result: {}‘.format(f.result()))

因为池中的工作线程与任务同样多,故而所有任务都可以启动。它们会按随机的顺序完成,所以每次运行这个示例程序时as_completed()生成的值都不同。

1.4 回调

要在任务完成时采取某个动作,不用显式地等待结果,可以使用add_done_callback()指示Future完成时要调用一个新函数。这个回调应当是有一个参数(Future实例)的callable函数。

from concurrent import futures
import time

def task(n):
    print(‘{}: sleeping‘.format(n))
    time.sleep(0.5)
    print(‘{}: done‘.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print(‘{}: canceled‘.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print(‘{}: error returned: {}‘.format(
                fn.arg, error))
        else:
            result = fn.result()
            print(‘{}: value returned: {}‘.format(
                fn.arg, result))

if __name__ == ‘__main__‘:
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print(‘main: starting‘)
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

不论由于什么原因,只要认为Future“完成”,就会调用这个回调,所以在使用它之前必须检查传入回调的对象的状态。

1.5 撤销任务

如果一个Future已经提交但还没启动,那么可以调用它的cancel()方法将其撤销。

from concurrent import futures
import time

def task(n):
    print(‘{}: sleeping‘.format(n))
    time.sleep(0.5)
    print(‘{}: done‘.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print(‘{}: canceled‘.format(fn.arg))
    elif fn.done():
        print(‘{}: not canceled‘.format(fn.arg))

if __name__ == ‘__main__‘:
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print(‘main: starting‘)
    tasks = []

    for i in range(10, 0, -1):
        print(‘main: submitting {}‘.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print(‘main: did not cancel {}‘.format(i))

    ex.shutdown()

cancel()返回一个布尔值,指示任务是否可用撤销。

1.6 任务中的异常

如果一个任务产生一个未处理的异常,那么它会被保存到这个任务的Future,而且可以通过result()或exception()方法得到。

from concurrent import futures

def task(n):
    print(‘{}: starting‘.format(n))
    raise ValueError(‘the value {} is no good‘.format(n))

ex = futures.ThreadPoolExecutor(max_workers=2)
print(‘main: starting‘)
f = ex.submit(task, 5)

error = f.exception()
print(‘main: error: {}‘.format(error))

try:
    result = f.result()
except ValueError as e:
    print(‘main: saw error "{}" when accessing result‘.format(e))

如果在一个任务函数中抛出一个未处理的异常后调用了result(),那么会在当前上下文中再次抛出同样的异常。

1.7 上下文管理器

执行器会与上下文管理器合作,并发的运行任务并等待它们都完成。当上下文管理器退出时,会调用执行器的shutdown()方法。

from concurrent import futures

def task(n):
    print(n)

with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print(‘main: starting‘)
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print(‘main: done‘)

离开当前作用域时如果要清理线程或进程资源,那么用这种方式使用执行器就很有用。

原文地址:https://www.cnblogs.com/liuhui0308/p/12602053.html

时间: 2024-10-14 23:44:02

Python3标准库:concurrent.futures管理并发任务池的相关文章

Python3标准库:threading进程中管理并发操作

1. threading进程中管理并发操作 threading模块提供了管理多个线程执行的API,允许程序在同一个进程空间并发的运行多个操作. 1.1 Thread对象 要使用Thread,最简单的方法就是用一个目标函数实例化一个Thread对象,并调用start()让它开始工作. import threading def worker(): """thread worker function""" print('Worker') threads

Python3标准库

文本 1. string:通用字符串操作 2. re:正则表达式操作 3. difflib:差异计算工具 4. textwrap:文本填充 5. unicodedata:Unicode字符数据库 6. stringprep:互联网字符串准备工具 7. readline:GNU按行读取接口 8. rlcompleter:GNU按行读取的实现函数 二进制数据 9. struct:将字节解析为打包的二进制数据 10. codecs:注册表与基类的编解码器 数据类型 11. datetime:基于日期与

使用concurrent.futures模块并发,实现进程池、线程池

一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的异步多线程/多进程代码.从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码.实现了对thread

4.Python3标准库--算法

(一)functools:管理函数的工具 import functools ''' functools模块提供了一些工具来管理或扩展和其他callable对象,从而不必完全重写 ''' 1.修饰符 from functools import partial ''' functools模块提供的主要工具就是partial类,可以用来包装一个有默认参数的callable对象. 得到的对象本身就是callable,可以把它看作是原来的参数. ''' # 举个栗子 def foo(name, age,

Python3标准库:weakref对象的非永久引用

1. weakref对象的非永久引用 weakref模块支持对象的弱引用.正常的引用会增加对象的引用数,并避免它被垃圾回收.但结果并不总是如期望中的那样,比如有时可能会出现一个循环引用,或者有时需要内存时可能要删除对象的缓存.弱引用(weak reference)是一个不能避免对象被自动清理的对象句柄. 1.1 引用 对象的弱引用要通过ref类来管理.要获取原对象,可以调用引用对象. import weakref class ExpensiveObject: def __del__(self):

Python3标准库:random伪随机数生成器

1. random伪随机数生成器 random模块基于Mersenne Twister算法提供了一个快速伪随机数生成器.原来开发这个生成器是为了向蒙特卡洛模拟生成输入,Mersenne Twister算法会生成大周期近均匀分布的数,因此适用于大量不同类型的应用. 1.1 生成随机数 random()函数从所生成的序列返回下一个随机的浮点值.返回的所有值都落在0<=n<1.0区间内. import random for i in range(5): print('%04.3f' % random

Python3标准库:shelve对象的持久存储

1. shelve对象的持久存储 不需要关系数据库时,可以用shelve模块作为持久存储Python对象的一个简单的选择.类似于字典,shelf按键访问.值将被pickled并写至由dbm创建和管理的数据库. 1.1 创建一个新shelf 使用shelve最简单的方法就是利用DbfilenameShelf类.它使用dbm存储数据.这个类可以直接使用,也可以通过调用shelve.open()来使用. import shelve with shelve.open('test_shelf.db') a

Python3标准库:urllib.parse分解URL

1. urllib.parse分解URL urllib.parse模块提供了一些函数,可以管理URL及其组成部分,这包括将URL分解为组成部分以及由组成部分构成URL. 1.1 解析 urlparse()函数的返回值是一个ParseResult对象,其相当于一个包含6个元素的tuple. from urllib.parse import urlparse url = 'http://netloc/path;param?query=arg#frag' parsed = urlparse(url)

Python3标准库:copy复制对象

1. copy复制对象 copy模块包括两个函数copy()和deepcopy(),用于复制现有的对象. 1.1 浅副本 copy()创建的浅副本(shallow copy)是一个新容器,其中填充了原对象内容的引用.建立list对象的一个浅副本时,会构造一个新的list,并将原对象的元素追加到这个list. import copy import functools @functools.total_ordering class MyClass: def __init__(self, name):