asyncio 并发编程(二)

Future 对象

future 表示还没有完成的工作结果。事件循环可以通过监视一个future 对象的状态来指示它已经完成。future 对象有几个状态:

  • Pending:循环
  • Running:运行
  • Done:完成
  • Cancelled:取消

获取 Future 中的结果

创建future的时候,taskpending,事件循环调用执行的时候是running,调用完毕是done,如果需要停止事件循环,就需要先把task取消,状态为cancel

import asyncio

def callback(future, result):
    print('future 的状态', future)
    print('设置 future 的结果', result)
    future.set_result(result)           # 设置返回值
    print('此时 future 的状态', future)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()     # 创建一个 Future 对象
        loop.call_soon(callback, all_done, 'Future is done!')
        print('进入事件循环')
        result = loop.run_until_complete(all_done)
        print('返回值:', result)
    finally:
        print('关闭事件循环')
        loop.close()
    print('获取future 的返回值', all_done.result())       # 获取返回结果

运行结果:

进入事件循环
future 的状态 <Future pending cb=[_run_until_complete_cb() at C:\Python35\Lib\asyncio\base_events.py:124]>
设置 future 的结果 Future is done!
此时 future 的状态 <Future finished result='Future is done!'>
返回值: Future is done!
关闭事件循环
获取future 的返回值 Future is done!

总结:

all_done = asyncio.Future()     # 创建一个 Future 对象
all_done.result()               # 获取返回结果
all_done.set_result(result)         # 设置返回值

await 关键字获取 Future 中的结果

import asyncio

def callback(future, result):
    print('设置 future 返回值', result)
    future.set_result(result)

async def main(loop):
    all_done = asyncio.Future()
    loop.call_soon(callback, all_done, 'Future is done!')
    # await 获取 future 结果
    result = await all_done
    print('future 中结果', result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print('进入事件循环')
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

运行结果:

进入事件循环
设置 future 返回值 Future is done!
future 中结果 Future is done!

Future 回调

Future 在完成的时候可以执行一些回调函数,回调函数按注册时的顺序进行调用:

def add_done_callback(self, fn):
    """Add a callback to be run when the future becomes done.

    The callback is called with a single argument - the future object. If
    the future is already done when this is called, the callback is
    scheduled with call_soon.
    """
    if self._state != _PENDING:
        self._loop.call_soon(fn, self)
    else:
        self._callbacks.append(fn)

add_done_callback(self, fn),除了 self 外,只接收一个参数,且回调函数 不支持关键字参数,因此需要用到 functools.partial 包装:

import asyncio
import functools

def callback(future, n):
    print('callback 执行, future is done', n, future.result())

async def register_callbacks(all_done):
    print('注册 callback 对 future 对象中')
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))

async def main(all_done):
    await register_callbacks(all_done)
    print('设置 future 的结果')
    all_done.set_result('Future is done!')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()
        loop.run_until_complete(main(all_done))
    finally:
        loop.close()

运行结果:

注册 callback 对 future 对象中
设置 future 的结果
callback 执行, future is done 1 Future is done!
callback 执行, future is done 2 Future is done!

Task 任务

并发执行任务

任务 Task 是与实践循环交互的主要途径之一,任务可以包装、跟踪协程。同时 Task 也是 Future 的子类,其使用方式与 Future 无异

协程可以等待任务,每个任务都有一个结果,在它完成后可以获取这个结果。

协程是没有状态的,通过 create_task() 可以将协程包装成有状态的任务,也可以在任务运行中取消任务。

import asyncio

async def child():
    print('进入子协程')
    return '子协程返回值'

async def main(loop):
    print('将子协程包装成 Task 任务')
    task = loop.create_task(child())        # 将协程包装成有状态的任务
    print('通过 cancel 取消 Task 任务')
    task.cancel()
    try:
        result = await task         # 使用 await 获取 task 的结果
        print('task 结果', result)
    except asyncio.CancelledError:
        print('取消任务抛出异常')
    else:
        print('获取任务结果', task.result())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

运行结果:

将子协程包装成 Task 任务
通过 cancel 取消 Task 任务
取消任务抛出异常

总结:

task = loop.create_task(child())        # 将协程包装成有状态的任务
result = await task         # 使用 await 获取 task 的结果
task.cancel()               # 取消任务,会报 CancelledError

注释 task.cancel(),即可正常执行任务:

将子协程包装成 Task 任务
进入子协程
task 结果 子协程返回值
获取任务结果 子协程返回值

可以使用 asyncio.ensure_future(coroutine) 建一个 task。在 Python3.7 中可以使用 asyncio.create_task创建任务。

组合协程 -- wait 等待多个协程

一般地通过 await 链式调用的方式可以管理一系列的协程,但是如果要在一个协程钟等待多个协程。如:在一个协程钟等待 1000 个异步网络请求,对于访问次序没有要求时。就可以使用另外的关键字 wait 或 gather 来解决,wait 可以暂停一个协程,直至后台操作完成。

import asyncio

async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print("数字 %s 被取消" % n)
        raise

async def main():
    tasks = [num(i) for i in range(10)]
    complete, pending = await asyncio.wait(tasks, timeout=0.5)
    print(complete, pending)
    for i in complete:
        print("当前数字", i.result())
    if pending:
        print("取消未完成的任务")
        for p in pending:
            p.cancel()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果:

{<Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=2>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=0>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=1>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=3>, <Task finished coro=<num() done, defined at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:4> result=4>} {<Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<num() running at D:/pycharm resource/Projects/TestDeploy/协程/组合协程.py:6> wait_for=<Future finished result=None>>}
当前数字 2
当前数字 0
当前数字 1
当前数字 3
当前数字 4
取消未完成的任务
数字 5 被取消
数字 6 被取消
数字 9 被取消
数字 8 被取消
数字 7 被取消

总结:

  • wait 内部采用 set 报错创建的 Task 实例,因此是无序执行的
  • wait 返回值为一个元组,包含两个集合,分别为已完成和未完成的任务
  • wait 第二个参数为超时时间,超过这个时间,未完成的任务其状态将会变为 pending,可以看到 5 及以上的数字都被取消了,这是因为 sleep 时间大于 wait 超时时间


gather

gater 任务无法取消,返回值且只有一个,另外可以根据参数的传入顺序,顺序输出

import asyncio
from time import ctime

async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print("数字 %s 被取消" % n)
        raise

async def main():
    tasks = [num(i) for i in range(10)]
    complete = await asyncio.gather(*tasks)
    for i in complete:
        print("当前数字", i, ctime())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果:

当前数字 0 Sat Oct 12 11:48:57 2019
当前数字 1 Sat Oct 12 11:48:57 2019
当前数字 2 Sat Oct 12 11:48:57 2019
当前数字 3 Sat Oct 12 11:48:57 2019
当前数字 4 Sat Oct 12 11:48:57 2019
当前数字 5 Sat Oct 12 11:48:57 2019
当前数字 6 Sat Oct 12 11:48:57 2019
当前数字 7 Sat Oct 12 11:48:57 2019
当前数字 8 Sat Oct 12 11:48:57 2019
当前数字 9 Sat Oct 12 11:48:57 2019


gather 阶段性操作

gather 通常被用来阶段性的一个操作,做完第一步才能做第二步:

import asyncio
from time import ctime, time

async def step1(n, start):
    print("第一阶段开始", ctime())
    await asyncio.sleep(n)
    print("第一阶段完成", ctime())
    print("此时用时", time() - start)
    return n

async def step2(n, start):
    print("第二阶段开始", ctime())
    await asyncio.sleep(n)
    print("第二阶段完成", ctime())
    print("此时用时", time() - start)
    return n

async def main():
    now = time()
    result = await asyncio.gather(step1(5, now), step2(2, now))
    for i in result:
        print(i)
    print("总用时", time() - now)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果:

第二阶段开始 Sat Oct 12 11:54:25 2019
第一阶段开始 Sat Oct 12 11:54:25 2019
第二阶段完成 Sat Oct 12 11:54:27 2019
此时用时 2.000483512878418
第一阶段完成 Sat Oct 12 11:54:30 2019
此时用时 5.001391649246216
5
2
总用时 5.001391649246216

总结:

  • step1step2是并行运行的。
  • gather 会等待最耗时的那个完成之后才返回结果,耗时总时间取决于其中任务最长时间的那个。

任务完成时进行处理

as_complete 是一个生成器,它会管理指定的一个任务列表,并生成它们的结果。与 wait 类似,也是无序输出的,不过在执行其他动作之前没有必要等待所有后台操作完成。

import asyncio
from time import time, ctime

async def foo(n):
    print('Waiting: ', n, ctime())
    await asyncio.sleep(n)
    return n

async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task 执行结果: {}'.format(result), ctime())

now = lambda: time()
start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)

运行结果:

Waiting:  1 Sat Oct 12 12:00:52 2019
Waiting:  2 Sat Oct 12 12:00:52 2019
Waiting:  4 Sat Oct 12 12:00:52 2019
Task 执行结果: 1 Sat Oct 12 12:00:53 2019
Task 执行结果: 2 Sat Oct 12 12:00:54 2019
Task 执行结果: 4 Sat Oct 12 12:00:56 2019
4.003674030303955

从上面运行结果可以看出来,只要任务完成了就立马返回结果,不等待其他任务

多任务

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())

运行结果:

Tasks: [<Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>, <Task pending coro=<request() running at D:/pycharm resource/Projects/TestDeploy/协程/异步 http.py:54>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

动态创建协程

Python 协程只能运行在事件循环中,一旦事件循环运行,又会阻塞当前任务。如果想实现动态添加任务,则只能在另开启一个线程,这个线程的主要任务是用来运行事件循环。

import asyncio
import threading
from time import time, ctime

def thread_running_loop(lp):
    print('loop running', ctime())
    asyncio.set_event_loop(lp)      # 在此线程中设置一个新的事件循环,默认情况事件循环是主协程的
    lp.run_forever()                # 一直运行

async def func(arg):
    print('ready to work arg:', arg, ctime())
    await asyncio.sleep(1)
    print('done', arg, ctime())

if __name__ == '__main__':
    # 创建一个新的事件循环给子线程
    newlp = asyncio.new_event_loop()
    t = threading.Thread(target=thread_running_loop, args=(newlp, ))
    t.start()

    # 添加 5 个协程,并制定事件循环,第二个参数
    for i in range(5):
        asyncio.run_coroutine_threadsafe(func(i), newlp)

    t.join()

运行结果:

loop running Sat Oct 12 18:40:41 2019
ready to work arg: 0 Sat Oct 12 18:40:41 2019
ready to work arg: 1 Sat Oct 12 18:40:41 2019
ready to work arg: 2 Sat Oct 12 18:40:41 2019
ready to work arg: 3 Sat Oct 12 18:40:41 2019
ready to work arg: 4 Sat Oct 12 18:40:41 2019
done 0 Sat Oct 12 18:40:42 2019
done 2 Sat Oct 12 18:40:42 2019
done 4 Sat Oct 12 18:40:42 2019
done 1 Sat Oct 12 18:40:42 2019
done 3 Sat Oct 12 18:40:42 2019

另一种写法

import asyncio
from threading import Thread

async def production_task():
    i = 0
    while True:
        # 将consumption这个协程每秒注册一个到运行在线程中的循环,thread_loop每秒会获得一个一直打印i的无限循环任务
        asyncio.run_coroutine_threadsafe(consumption(i),
                                         thread_loop)  # 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
        await asyncio.sleep(1)  # 必须加await
        i += 1

async def consumption(i):
    while True:
        print("我是第{}任务".format(i))
        await asyncio.sleep(1)

def start_loop(loop):
    # 运行事件循环, loop以参数的形式传递进来运行
    asyncio.set_event_loop(loop)
    loop.run_forever()

thread_loop = asyncio.new_event_loop()  # 获取一个事件循环
run_loop_thread = Thread(target=start_loop, args=(thread_loop,))  # 将次事件循环运行在一个线程中,防止阻塞当前主线程
run_loop_thread.start()  # 运行线程,同时协程事件循环也会运行

advocate_loop = asyncio.get_event_loop()  # 将生产任务的协程注册到这个循环中
advocate_loop.run_until_complete(production_task())  # 运行次循环

控制并发量

asyncio 有多重同步机制,如: Semaphore、Lock(锁)、Condition(条件)、Event(事件)、Queue ,通过 Semaphore 可以来同步控制有效并发量,从而减轻服务器压力:

#coding:utf-8
import time,asyncio

a=time.time()

id=1
async def hello(id,semaphore):
    async with semaphore:
        await asyncio.sleep(1)
        print('working id:'+str(id))

async def run():
    semaphore = asyncio.Semaphore(5) # 限制并发量为5
    to_get = [hello(id,semaphore) for id in range(20)] #总共20任务
    await asyncio.wait(to_get)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()
    print(time.time()-a)

Tips:Python 提供的测试网站:http://httpbin.org/get?a={}

如何把同步的代码改成异步的

同步

import asyncio

def handle(name):
    resp1 = func1(name)
    resp2 = func2(name)
    resp3 = func3(resp1, resp2)

    func4(resp3)
    func5(name)

def func1(name):
    print('func1 正在运行,你好 {}'.format(name))

    return 'func1'

def func2(name):
    print('func2 正在运行,你好 {}'.format(name))

    return 'fun2'

def func3(arg1, arg2):
    print('func3 正在运行,参数:{}、{}'.format(arg1, arg2))

    return 'func3'

def func4(arg):
    print('func4 正在运行,参数 {}'.format(arg))

    return 'func4'

def func5(name):
    print('func5 正在运行,你好 {}'.format(name))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = handle('rose')

    loop.run_until_complete(coro)

以上代码中各函数从上而下以此执行,整个流程是同步的。

异步

将函数改成异步,几个步骤:

  • 在函数前面加上 async,将函数变为异步函数
  • 使用 ensure_future 将调用函数变为 future
  • await 关键字获取其结果
import asyncio

async def handle(name):
    resp1 = asyncio.ensure_future(func1(name))
    resp2 = asyncio.ensure_future(func2(name))

    result1, result2 = await asyncio.gather(resp1, resp2)
    print('--->', result1, result2)
    resp3 = await func3(result1, result2)
    await func4(resp3)

    # 协程中调用普通函数
    loop.call_soon(func5, name)

async def func1(name):
    print('func1 正在运行,你好 {}'.format(name))

    return 'func1'

async def func2(name):
    print('func2 正在运行,你好 {}'.format(name))

    return 'fun2'

async def func3(arg1, arg2):
    print('func3 正在运行,参数:{}、{}'.format(arg1, arg2))

    return 'func3'

async def func4(arg):
    print('func4 正在运行,参数 {}'.format(arg))

    return 'func4'

def func5(name):
    print('func5 正在运行,你好 {}'.format(name))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = handle('rose')

    loop.run_until_complete(coro)

原文地址:https://www.cnblogs.com/midworld/p/12332325.html

时间: 2024-10-13 00:29:47

asyncio 并发编程(二)的相关文章

asyncio并发编程

目录 asyncio并发编程 事件循环 基本使用 获取协程的返回值 回调 wait和gather task取消和子协程调用原理 task取消 子协程 其他方法 call_soon call_later call_at call_soon_threadsafe ThreadPollExecutor 和 asyncio完成阻塞io请求 asyncio发送http请求 asyncio同步和通信 同步 通信 asyncio并发编程 asyncio是Python3.4引入的一个用于异步IO的库,其主要功能

【Java并发编程二】同步容器和并发容器

一.同步容器 在Java中,同步容器包括两个部分,一个是vector和HashTable,查看vector.HashTable的实现代码,可以看到这些容器实现线程安全的方式就是将它们的状态封装起来,并在需要同步的方法上加上关键字synchornized. 另一个是Collections类中提供的静态工厂方法创建的同步包装类. 同步容器都是线程安全的.但是对于复合操作(迭代.缺少即加入.导航:根据一定的顺序寻找下一个元素),有时可能需要使用额外的客户端加锁进行保护.在一个同步容器中,复合操作是安全

Java 并发编程(二):如何保证共享变量的原子性?

线程安全性是我们在进行 Java 并发编程的时候必须要先考虑清楚的一个问题.这个类在单线程环境下是没有问题的,那么我们就能确保它在多线程并发的情况下表现出正确的行为吗? 我这个人,在没有副业之前,一心扑在工作上面,所以处理的蛮得心应手,心态也一直保持的不错:但有了副业之后,心态就变得像坐过山车一样.副业收入超过主业的时候,人特别亢奋,像打了鸡血一样:副业迟迟打不开局面的时候,人就变得惶惶不可终日. 仿佛我就只能是个单线程,副业和主业并行开启多线程模式的时候,我就变得特别没有安全感,尽管整体的收入

并发编程(二)

五.线程的概述 5.1 什么是线程 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程 线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程就是进程 车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线 流水线的工作需要电源,电源就相当于cpu 所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上执行的单位. 多线程(即多个控制线程)的概念是,在一个进程中存在朵儿控制线程,多个控制线程共

并发编程二

一.守护进程 主进程创建守护进程,守护进程的主要的特征为:①守护进程会在主进程代码执行结束时立即终止:②守护进程内无法继续再开子进程,否则会抛出异常. 实例: from multiprocessing import Process import time def foo(): print('starting123') time.sleep(1) print('endig123') def bar(): print('starting456') time.sleep(3) print('endin

gj13 asyncio并发编程

13.1 事件循环 asyncio 包含各种特定系统实现的模块化事件循环 传输和协议抽象 对TCP.UDP.SSL.子进程.延时调用以及其他的具体支持 模仿futures模块但适用于事件循环使用的Future类 基于 yield from 的协议和任务,可以让你用顺序的方式编写并发代码 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池 模仿threading模块中的同步原语.可以用在单线程内的协程之间 import asyncio import time # 不再这使用同步阻

漫谈并发编程(二):java线程的创建与基本控制

java线程的创建 定义任务 在java中使用任务这个名词来表示一个线程控制流的代码段,用Runnable接口来标记一个任务,该接口的run方法为线程执行的代码段. public class LiftOff implements Runnable { protected int countDown = 10; private static int taskCount = 0; private final int id = taskCount++; public void run() { whil

python并发编程(二):协程

'''协程: 1. 协程的定义: 1) 是一种用户态的轻量级线程, 即协程是由用户程序自己控制调度的 2) 是一种协作而非抢占式的处理并发方式, A --> B ---> A --> C 3) 协程的切换属于程序级别的, 操作系统不需要切换 2. 协程的特点: 1) 协程本身是一个线程, 是用户态的切换 2) 相比线程优点: 1> 切换没有消耗 2> 修改共享程序不需要加锁 3) 相比线程缺点: 一旦引入协程,就需要检测单线程下所有的IO行为, 实现遇到IO就切换,少一个都不

《Java并发编程实战》第三章 对象的共享 读书笔记

一.可见性 什么是可见性? Java线程安全须要防止某个线程正在使用对象状态而还有一个线程在同一时候改动该状态,并且须要确保当一个线程改动了对象的状态后,其它线程能够看到发生的状态变化. 后者就是可见性的描写叙述即多线程能够实时获取其它线程改动后的状态. *** 待补充   两个工人同一时候记录生产产品总数问题 1. 失效数据 可见性出现故障就是其它线程没有获取到改动后的状态,更直观的描写叙述就是其它线程获取到的数据是失效数据. 2. 非原子64位操作 3. 加锁与可见性 比如在一个变量的读取与