Python并发编程之学习异步IO框架:asyncio 中篇(十)

大家好,并发编程 进入第十章。
好了,今天的内容其实还挺多的,我准备了三天,到今天才整理完毕。希望大家看完,有所收获的,能给小明一个赞。这就是对小明最大的鼓励了。
为了更好地衔接这一节,我们先来回顾一下上一节的内容。

上一节「」,我们首先介绍了,如何创建一个协程对象.
主要有两种方法

  • 通过async关键字,
  • 通过@asyncio.coroutine 装饰函数。

然后有了协程对象,就需要一个事件循环容器来运行我们的协程。其主要的步骤有如下几点:

  • 将协程对象转为task任务对象
  • 定义一个事件循环对象容器用来存放task
  • 将task任务扔进事件循环对象中并触发

为了让大家,对生成器和协程有一个更加清晰的认识,我还介绍了yieldasync/await的区别。

最后,我们还讲了,如何给一个协程添加回调函数。

好了,用个形象的比喻,上一节,其实就只是讲了协程中的单任务。哈哈,是不是还挺难的?希望大家一定要多看几遍,多敲代码,不要光看哦。

那么这一节,我们就来看下,协程中的多任务

. 本文目录

  • 协程中的并发
  • 协程中的嵌套
  • 协程中的状态
  • gather与wait

. 协程中的并发

协程的并发,和线程一样。举个例子来说,就好像 一个人同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其他两个馒头。就这样交替换着吃。

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。

第一步,当然是创建多个协程的列表。

# 协程函数async def do_some_work(x):    print(‘Waiting: ‘, x)    await asyncio.sleep(x)    return ‘Done after {}s‘.format(x)

# 协程对象coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)

# 将协程转成task,并组成listtasks = [    asyncio.ensure_future(coroutine1),    asyncio.ensure_future(coroutine2),    asyncio.ensure_future(coroutine3)]

第二步,如何将这些协程注册到事件循环中呢。

有两种方法,至于这两种方法什么区别,稍后会介绍。

  • 使用asyncio.wait()
loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))
  • 使用asyncio.gather()
# 千万注意,这里的 「*」 不能省略loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))

最后,return的结果,可以用task.result()查看。

for task in tasks:    print(‘Task ret: ‘, task.result())

完整代码如下

import asyncio

# 协程函数async def do_some_work(x):    print(‘Waiting: ‘, x)    await asyncio.sleep(x)    return ‘Done after {}s‘.format(x)

# 协程对象coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)

# 将协程转成task,并组成listtasks = [    asyncio.ensure_future(coroutine1),    asyncio.ensure_future(coroutine2),    asyncio.ensure_future(coroutine3)]

loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:    print(‘Task ret: ‘, task.result())

输出结果

Waiting:  1Waiting:  2Waiting:  4Task ret:  Done after 1sTask ret:  Done after 2sTask ret:  Done after 4s

. 协程中的嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

来看个例子。

import asyncio

# 用于内部的协程函数async def do_some_work(x):    print(‘Waiting: ‘, x)    await asyncio.sleep(x)    return ‘Done after {}s‘.format(x)

# 外部的协程函数async def main():    # 创建三个协程对象    coroutine1 = do_some_work(1)    coroutine2 = do_some_work(2)    coroutine3 = do_some_work(4)

# 将协程转为task,并组成list    tasks = [        asyncio.ensure_future(coroutine1),        asyncio.ensure_future(coroutine2),        asyncio.ensure_future(coroutine3)    ]

# 【重点】:await 一个task列表(协程)    # dones:表示已经完成的任务    # pendings:表示未完成的任务    dones, pendings = await asyncio.wait(tasks)

for task in dones:        print(‘Task ret: ‘, task.result())

loop = asyncio.get_event_loop()loop.run_until_complete(main())

如果这边,使用的是asyncio.gather(),是这么用的

# 注意这边返回结果,与await不一样

results = await asyncio.gather(*tasks)for result in results:    print(‘Task ret: ‘, result)

输出还是一样的。

Waiting:  1Waiting:  2Waiting:  4Task ret:  Done after 1sTask ret:  Done after 2sTask ret:  Done after 4s

仔细查看,可以发现这个例子完全是由 上面「协程中的并发」例子改编而来。结果完全一样。只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。

其实你如果去看下asyncio.await()的源码的话,你会发现下面这种写法

loop.run_until_complete(asyncio.wait(tasks))

看似没有嵌套,实际上内部也是嵌套的。

这里也把源码,贴出来,有兴趣可以看下,没兴趣,可以直接跳过。

# 内部协程函数async def _wait(fs, timeout, return_when, loop):    assert fs, ‘Set of Futures is empty.‘    waiter = loop.create_future()    timeout_handle = None    if timeout is not None:        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)    counter = len(fs)

def _on_completion(f):        nonlocal counter        counter -= 1        if (counter <= 0 or            return_when == FIRST_COMPLETED or            return_when == FIRST_EXCEPTION and (not f.cancelled() and                                                f.exception() is not None)):            if timeout_handle is not None:                timeout_handle.cancel()            if not waiter.done():                waiter.set_result(None)

for f in fs:        f.add_done_callback(_on_completion)

try:        await waiter    finally:        if timeout_handle is not None:            timeout_handle.cancel()

done, pending = set(), set()    for f in fs:        f.remove_done_callback(_on_completion)        if f.done():            done.add(f)        else:            pending.add(f)    return done, pending

# 外部协程函数async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):    if futures.isfuture(fs) or coroutines.iscoroutine(fs):        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")    if not fs:        raise ValueError(‘Set of coroutines/Futures is empty.‘)    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):        raise ValueError(f‘Invalid return_when value: {return_when}‘)

if loop is None:        loop = events.get_event_loop()

fs = {ensure_future(f, loop=loop) for f in set(fs)}    # 【重点】:await一个内部协程    return await _wait(fs, timeout, return_when, loop)

. 协程中的状态

还记得我们在讲生成器的时候,有提及过生成器的状态。同样,在协程这里,我们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。

Pending:创建future,还未执行
Running:事件循环正在调用执行任务
Done:任务执行完毕
Cancelled:Task被取消后的状态

可手工 python3 xx.py 执行这段代码,

import asyncioimport threadingimport time

async def hello():    print("Running in the loop...")    flag = 0    while flag < 1000:        with open("F:\\test.txt", "a") as f:            f.write("------")        flag += 1    print("Stop the loop")

if __name__ == ‘__main__‘:    coroutine = hello()    loop = asyncio.get_event_loop()    task = loop.create_task(coroutine)

# Pending:未执行状态    print(task)    try:        t1 = threading.Thread(target=loop.run_until_complete, args=(task,))        # t1.daemon = True        t1.start()

# Running:运行中状态        time.sleep(1)        print(task)        t1.join()    except KeyboardInterrupt as e:        # 取消任务        task.cancel()        # Cacelled:取消任务        print(task)    finally:        print(task)

顺利执行的话,将会打印 Pending -> Pending:Runing -> Finished 的状态变化

假如,执行后 立马按下 Ctrl+C,则会触发task取消,就会打印 Pending -> Cancelling -> Cancelling 的状态变化。

. gather与wait

还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗?

  • 使用asyncio.wait()
loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))
  • 使用asyncio.gather()
# 千万注意,这里的 「*」 不能省略loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))

asyncio.gatherasyncio.wait 在asyncio中用得的比较广泛,这里有必要好好研究下这两货。

还是照例用例子来说明,先定义一个协程函数

import asyncio

async def factorial(name, number):    f = 1    for i in range(2, number+1):        print("Task %s: Compute factorial(%s)..." % (name, i))        await asyncio.sleep(1)        f *= i    print("Task %s: factorial(%s) = %s" % (name, number, f))

接收参数方式

asyncio.wait

接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。

它可以这样,用asyncio.ensure_future转为task对象

tasks=[       asyncio.ensure_future(factorial("A", 2)),       asyncio.ensure_future(factorial("B", 3)),       asyncio.ensure_future(factorial("C", 4))]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

也可以这样,不转为task对象。

loop = asyncio.get_event_loop()

tasks=[       factorial("A", 2),       factorial("B", 3),       factorial("C", 4)]

loop.run_until_complete(asyncio.wait(tasks))

asyncio.gather

接收的就比较广泛了,他可以接收list对象,但是 * 不能省略

tasks=[       asyncio.ensure_future(factorial("A", 2)),       asyncio.ensure_future(factorial("B", 3)),       asyncio.ensure_future(factorial("C", 4))]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

还可以这样,和上面的 * 作用一致,这是因为asyncio.gather()的第一个参数是 *coros_or_futures,它叫 非命名键值可变长参数列表,可以集合所有没有命名的变量。

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(    factorial("A", 2),    factorial("B", 3),    factorial("C", 4),))

甚至还可以这样

loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])

loop.run_until_complete(asyncio.gather(group1, group2, group3))

返回结果不同

asyncio.wait

asyncio.wait 返回donespendings

  • dones:表示已经完成的任务
  • pendings:表示未完成的任务

如果我们需要获取,运行结果,需要手工去收集获取。

dones, pendings = await asyncio.wait(tasks)

for task in dones:    print(‘Task ret: ‘, task.result())

asyncio.gather

asyncio.gather 它会把值直接返回给我们,不需要手工去收集。

results = await asyncio.gather(*tasks)

for result in results:    print(‘Task ret: ‘, result)

wait有控制功能

import asyncioimport random

async def coro(tag):    await asyncio.sleep(random.uniform(0.5, 5))

loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

# 【控制运行任务数】:运行第一个任务就返回# FIRST_COMPLETED :第一个任务完全返回# FIRST_EXCEPTION:产生第一个异常返回# ALL_COMPLETED:所有任务完成返回 (默认选项)dones, pendings = loop.run_until_complete(    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))print("第一次完成的任务数:", len(dones))

# 【控制时间】:运行一秒后,就返回dones2, pendings2 = loop.run_until_complete(    asyncio.wait(pendings, timeout=1))print("第二次完成的任务数:", len(dones2))

# 【默认】:所有任务完成后返回dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))

print("第三次完成的任务数:", len(dones3))

loop.close()

输出结果

第一次完成的任务数: 1第二次完成的任务数: 4第三次完成的任务数: 5

快关注一下,成为Python高手

原文地址:https://www.cnblogs.com/wongbingming/p/9114171.html

时间: 2024-11-05 18:42:18

Python并发编程之学习异步IO框架:asyncio 中篇(十)的相关文章

Python并发编程之同步\异步and阻塞\非阻塞

一.什么是进程 进程: 正在进行的一个过程或者说一个任务.而负责执行任务则是cpu. 进程和程序的区别: 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程. 需要强调的是:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播郭德纲,一个可以播高晓松. 二.并行和并发 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务 (一)并发:是伪并行,即

python并发编程之多进程

python并发编程之多进程 一.什么是进程 进程:正在进行的一个过程或者一个任务,执行任务的是CPU. 原理:单核加多道技术 二.进程与程序的区别 进程是指程序的运行过程 需要强调的是:同一个程序执行两次是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,另一个可以播放武藤兰. 三.并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务. (1)并发

python-学习-python并发编程之多进程与多线程

一 multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程.Python提供了multiprocessing.    multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似.  multiprocessing模块的功能众多:支持子进程.通信和共享数据.执行不同形式的同步,

Python并发编程实例教程

有关Python中的并发编程实例,主要是对Threading模块的应用,文中自定义了一个Threading类库. 一.简介 我们将一个正在运行的程序称为进程.每个进程都有它自己的系统状态,包含内存状态.打开文件列表.追踪指令执行情况的程序指针以及一个保存局部变量的调用栈.通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程.在任何给定的时刻,一个程序只做一件事情. 一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或

python并发编程&amp;多线程(二)

前导理论知识见:python并发编程&多线程(一) 一 threading模块介绍 multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性 官网链接:https://docs.python.org/3/library/threading.html?highlight=threading#(装B模式加载中…………) 二 开启线程的两种方式  方式一  方式二 三 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别  1 谁的开启速度快  2 瞅

进程,操作系统,Python并发编程之多进程

1.进程基础知识 1.程序:若干文件 2.进程:一个正在执行的文件,程序 3.进程被谁执行:cpu最终运行指定的程序 4.操作系统调度作用:将磁盘上的程序加载到内存,然后交由CPU去处理,一个CPU正在运行的一个程序,就叫开启了一个进程 2.操作系统 1.操作系统:存在于硬盘与软件之间,管理.协调.控制软件与硬件的交互 2.操作系统的作用:将一些复杂的硬件封装成简单的借口,便于使用;合理地调度分配多个进程与cpu的关系,让其有序化 3.操作系统发展史 ①第一代电子计算机(1940-1955) 二

Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信

目录 Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信 1.昨日回顾 2.僵尸进程和孤儿进程 2.1僵尸进程 2.2孤儿进程 2.3僵尸进程如何解决? 3.互斥锁,锁 3.1互斥锁的应用 3.2Lock与join的区别 4.进程之间的通信 进程在内存级别是隔离的 4.1基于文件通信 (抢票系统) 4.2基于队列通信 Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信 1.昨日回顾 1.创建进程的两种方式: 函数, 类. 2.pid: os.getpid() os.get

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

目录 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 2.死锁现象与递归锁 2.1死锁现象 2.2递归锁 3.信号量 4.GIL全局解释器锁 4.1背景 4.2为什么加锁 5.GIL与Lock锁的区别 6.验证计算密集型IO密集型的效率 6.1 IO密集型 6.2 计算密集型 7.多线程实现socket通信 7.1服务端 7.2客户端 8.进程池,线程池 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 #生产者消

python并发编程&amp;多线程(一)

本篇理论居多,实际操作见:  python并发编程&多线程(二) 一 什么是线程 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程 线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程 车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线 流水线的工作需要电源,电源就相当于cpu 所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位. 多线程(即多个控制线程)的概念