Python的异步编程[0] -> 协程[1] -> 使用协程建立自己的异步非阻塞模型

使用协程建立自己的异步非阻塞模型



接下来例子中,将使用纯粹的Python编码搭建一个异步模型,相当于自己构建的一个asyncio模块,这也许能对asyncio模块底层实现的理解有更大的帮助。主要参考为文末的链接,以及自己的补充理解。

完整代码

  1 #!/usr/bin/python
  2 # =============================================================
  3 # File Name: async_base.py
  4 # Author: LI Ke
  5 # Created Time: 1/29/2018 09:18:50
  6 # =============================================================
  7
  8
  9 import types
 10 import time
 11
 12
 13 @types.coroutine
 14 def switch():
 15     print(‘Switch: Start‘)
 16     yield
 17     print(‘Switch: Done‘)
 18
 19 async def coro_1():
 20     print(‘C1: Start‘)
 21     await switch()
 22     print(‘C1: Stop‘)
 23
 24
 25 async def coro_2():
 26     print(‘C2: Start‘)
 27     print(‘C2: 1‘)
 28     print(‘C2: 2‘)
 29     print(‘C2: 3‘)
 30     print(‘C2: Stop‘)
 31
 32 c_1 = coro_1()
 33 c_2 = coro_2()
 34
 35 try:
 36     c_1.send(None)
 37 except StopIteration:
 38     pass
 39 try:
 40     c_2.send(None)
 41 except StopIteration:
 42     pass
 43 try:
 44     c_1.send(None)
 45 except StopIteration:
 46     pass
 47
 48 print(‘--------------------------------‘)
 49
 50 def run(coros):
 51     coros = list(coros)
 52
 53     while coros:
 54         # Duplicate list for iteration so we can remove from original list
 55         for coro in list(coros):
 56             try:
 57                 coro.send(None)
 58             except StopIteration:
 59                 coros.remove(coro)
 60
 61 c_1 = coro_1()
 62 c_2 = coro_2()
 63 run([c_1, c_2])
 64
 65 print(‘--------------------------------‘)
 66
 67 @types.coroutine
 68 def action(t):
 69     trace=[]
 70     while True:
 71         trace.append(time.time())
 72         if trace[-1] - trace[0] > t:
 73             break # This break will end this function and raise a StopIteration
 74         yield
 75
 76 async def coro_1():
 77     print(‘C1: Start‘)
 78     await action(2)
 79     print(‘C1: Stop‘)
 80
 81
 82 async def coro_2():
 83     print(‘C2: Start‘)
 84     await action(3)
 85     print(‘C2: Stop‘)
 86
 87 def timeit(f):
 88     def _wrapper(*args, **kwargs):
 89         start = time.time()
 90         re = f(*args, **kwargs)
 91         end = time.time()
 92         print(‘Time cost:‘, f.__name__, end-start)
 93         return re
 94     return _wrapper
 95
 96 c_1 = coro_1()
 97 c_2 = coro_2()
 98 timeit(run)([c_1])
 99 timeit(run)([c_2])
100
101 print(‘--------------------------------‘)
102
103 c_1 = coro_1()
104 c_2 = coro_2()
105 timeit(run)([c_1, c_2])

分段解释

首先会导入需要的模块,这里仅仅使用types和time两个模块,放弃异步I/O的asyncio模块。

1 import types
2 import time

接下来定义一个switch函数,利用types.coroutine装饰器将switch装饰成一个协程,这个协程将完成一个切换功能。

1 @types.coroutine
2 def switch():
3     print(‘Switch: Start‘)
4     yield
5     print(‘Switch: Done‘)

随后定义第一个协程,协程启动后,会进入一个await,即切入刚才的switch协程,这里使用async和await关键字完成对协程的定义。

1 async def coro_1():
2     print(‘C1: Start‘)
3     await switch()
4     print(‘C1: Stop‘)

同样的,再定义第二个协程,第二个协程将从头到尾顺序执行。

1 async def coro_2():
2     print(‘C2: Start‘)
3     print(‘C2: 1‘)
4     print(‘C2: 2‘)
5     print(‘C2: 3‘)
6     print(‘C2: Stop‘)

有了上面的两个协程,但我们在异步时,希望在执行完C_1的start后,切换进协程C_2,执行完成后再切换回来。那么此时就需要一个对协程切换进行控制的程序,具体顺序如下,

  1. 启动协程c_1,启动后会切换进switch函数,
  2. Switch中由于yield而切出,并保留上下文环境
  3. c_1.send()将获得返回结果(如果有的话),并继续执行
  4. 此时c_1已经被中止,启动c_2,则完成所有执行步骤,捕获生成器的中止异常
  5. 这时c_2以执行完毕,再次切回c_1(此时会从switch yield之后开始执行)继续执行。
 1 c_1 = coro_1()
 2 c_2 = coro_2()
 3
 4 try:
 5     c_1.send(None)
 6 except StopIteration:
 7     pass
 8 try:
 9     c_2.send(None)
10 except StopIteration:
11     pass
12 try:
13     c_1.send(None)
14 except StopIteration:
15     pass

最终得到结果如下,可以看到,整个过程完全按期望的流程进行,

C1: Start
Switch: Start
C2: Start
C2: 1
C2: 2
C2: 3
C2: Stop
Switch: Done
C1: Stop

但是这里的协程运行部分仍需改善,于是接下来便定义一个run函数用于执行一个协程列表。

run函数首先会遍历协程列表的副本,并不断尝试启动列表中的协程,当协程结束后便将协程从协程列表中删除,直到所有的协程都执行完毕为止。

 1 def run(coros):
 2     coros = list(coros)
 3
 4     while coros:
 5         # Duplicate list for iteration so we can remove from original list
 6         for coro in list(coros):
 7             try:
 8                 coro.send(None)
 9             except StopIteration:
10                 coros.remove(coro)
11
12 c_1 = coro_1()
13 c_2 = coro_2()
14 run([c_1, c_2])

测试一下run函数,得到结果与前面相同,

C1: Start
Switch: Start
C2: Start
C2: 1
C2: 2
C2: 3
C2: Stop
Switch: Done
C1: Stop

到目前为止,完成了一个简单的异步模型的搭建,即c_2无需等待c_1执行完成再继续执行,而是由c_1交出了控制权进行协作完成,同时也不存在多线程的抢占式任务,因为由始至终都只有一个线程在运行,而且也没有混乱的回调函数存在。

但是,还存在一个阻塞问题没有解决,也就是说,如果c_1中的switch函数是一个耗时的I/O操作或其他阻塞型操作,则此时需要等待switch的阻塞操作完成才能交出控制权,可如果希望在等待这个耗时操作时,先去执行c_2的任务,再回来检测c_1中的耗时操作是否完成,则需要使用非阻塞的方式。

首先,对刚才的switch进行改造,完成一个action协程,这个协程会根据传入的参数,执行对应时间后,再退出协程引发StopIteration,实现方式如下,每次切换进action中都会记录下时间,然后将时间和第一次进入的时间进行对比,如果超过了设置的时间便退出,如果没超过限制时间,则切出协程交还出控制权。

1 @types.coroutine
2 def action(t):
3     trace=[]
4     while True:
5         trace.append(time.time())
6         if trace[-1] - trace[0] > t:
7             break # This break will end this function and raise a StopIteration
8         yield

接着定义两个协程,分别执行action时间为2秒和3秒,同时定义一个计算时间的装饰器,用于时间记录。

 1 async def coro_1():
 2     print(‘C1: Start‘)
 3     await action(2)
 4     print(‘C1: Stop‘)
 5
 6
 7 async def coro_2():
 8     print(‘C2: Start‘)
 9     await action(3)
10     print(‘C2: Stop‘)
11
12 def timeit(f):
13     def _wrapper(*args, **kwargs):
14         start = time.time()
15         re = f(*args, **kwargs)
16         end = time.time()
17         print(‘Time cost:‘, f.__name__, end-start)
18         return re
19     return _wrapper

然后我们先分别运行两个协程进行一个实验,

1 c_1 = coro_1()
2 c_2 = coro_2()
3 timeit(run)([c_1])
4 timeit(run)([c_2])

从输出的结果可以看到两个协程的耗时与action执行的时间基本相同,且顺序执行的时间为两者之和,

C1: Start
C1: Stop
Time cost: run 2.030202865600586
C2: Start
C2: Stop
Time cost: run 3.0653066635131836

接下来,利用异步非阻塞的方式来执行这两个协程,

1 c_1 = coro_1()
2 c_2 = coro_2()
3 timeit(run)([c_1, c_2])

最后得到结果

C1: Start
C2: Start
C1: Stop
C2: Stop
Time cost: run 3.0743072032928467

从结果中可以看到,此时的运行方式是异步的形式,c_1启动后由于进入一个耗时action,且action被我们设置为非阻塞形式,因此c_1交出了控制权,控制权回到run函数后,启动了c_2,而c_2同样也进入到action中,这时两个协程都在等待任务完成,而监视run则在两个协程中不停轮询,不断进入action中查看各自的action操作是否完成,当有协程完成后,将继续启动这个协程的后续操作,直到最终所有协程结束。

按照非阻塞异步协程的方式,可以以单线程运行,避免资源锁的建立,也消除了线程切换的开销,并且最终获得了类似多线程运行的时间性能。

相关阅读



1. 协程和 async / await

参考链接



http://www.oschina.net/translate/playing-around-with-await-async-in-python-3-5

原文地址:https://www.cnblogs.com/stacklike/p/8379339.html

时间: 2024-08-07 15:31:13

Python的异步编程[0] -> 协程[1] -> 使用协程建立自己的异步非阻塞模型的相关文章

python IO非阻塞模型

server端 import socket sk = socket.socket() sk.bind(('127.0.0.1', 8010)) sk.setblocking(False) # sk.listen() conn_l = [] del_conn = [] while True: try: conn, addr = sk.accept() # 不阻塞,但是没人连我会报错 print('建立连接了:', addr) conn_l.append(conn) except BlockingI

Python的Web编程[0] -> Web客户端[1] -> Web 页面解析

 Web页面解析 / Web page parsing 1 HTMLParser解析 下面介绍一种基本的Web页面HTML解析的方式,主要是利用Python自带的html.parser模块进行解析.其主要步骤为: 创建一个新的Parser类,继承HTMLParser类; 重载handler_starttag等方法,实现指定功能; 实例化新的Parser并将HTML文本feed给类实例. 完整代码 1 from html.parser import HTMLParser 2 3 # An HTML

<史上最强>深入理解 Python 异步编程(上)

前言 很多朋友对异步编程都处于"听说很强大"的认知状态.鲜有在生产项目中使用它.而使用它的同学,则大多数都停留在知道如何使用 Tornado.Twisted.Gevent 这类异步框架上,出现各种古怪的问题难以解决.而且使用了异步框架的部分同学,由于用法不对,感觉它并没牛逼到哪里去,所以很多同学做 Web 后端服务时还是采用 Flask.Django等传统的非异步框架. 从上两届 PyCon 技术大会看来,异步编程已经成了 Python 生态下一阶段的主旋律.如新兴的 Go.Rust.

深入理解 Python 异步编程(上)

http://python.jobbole.com/88291/ 前言 很多朋友对异步编程都处于"听说很强大"的认知状态.鲜有在生产项目中使用它.而使用它的同学,则大多数都停留在知道如何使用 Tornado.Twisted.Gevent 这类异步框架上,出现各种古怪的问题难以解决.而且使用了异步框架的部分同学,由于用法不对,感觉它并没牛逼到哪里去,所以很多同学做 Web 后端服务时还是采用 Flask.Django等传统的非异步框架. 从上两届 PyCon 技术大会看来,异步编程已经成

Atitit.异步编程 java .net php python js 的比较

Atitit.异步编程 java .net php python js 的比较 1. 1.异步任务,异步模式,  APM模式,,  EAP模式, TAP 1 1.1.       APM模式: BeginXXX/EndXXX, IAsyncResult 2 1.2.       EAP模式(基于事件的异步模式) 2 1.3.      TAP(基于任务的异步模式) 2 2. 异步的实现机制::主要是通过线程and线程池实现的... 2 3. 异步编程的开发::当前都是通过api的,将来应该可以使

Async in C# 5.0(C#中的异步编程Async) 蜗牛翻译之第一章

p { display: block; margin: 3px 0 0 0; } --> 写在前面 在学异步,有位园友推荐了<async in C#5.0>,没找到中文版,恰巧也想提高下英文,用我拙劣的英文翻译一些重要的部分,纯属娱乐,简单分享,保持学习,谨记谦虚. 如果你觉得这件事儿没意义翻译的又差,尽情的踩吧.如果你觉得值得鼓励,感谢留下你的赞,祝各位爱技术的园友在今后每一次应该猛烈突破的时候,不选择知难而退.在每一次应该独立思考的时候,不选择随波逐流,应该全力以赴的时候,不选择尽力

探索Javascript异步编程

异步编程带来的问题在客户端Javascript中并不明显,但随着服务器端Javascript越来越广的被使用,大量的异步IO操作使得该问题变得明显.许多不同的方法都可以解决这个问题,本文讨论了一些方法,但并不深入.大家需要根据自己的情况选择一个适于自己的方法. 笔者在之前的一片博客中简单的讨论了Python和Javascript的异同,其实作为一种编程语言Javascript的异步编程是一个非常值得讨论的有趣话题. JavaScript 异步编程简介 回调函数和异步执行 所谓的异步指的是函数的调

究竟什么是异步编程?

在我们的工作和学习当中,到处充满了异步的身影,到底什么是异步,什么是异步编程,为什么要用异步编程,以及经典的异步编程有哪些,在工作中的场景又有什么,我们一点点深入的去学习. 什么是异步编程? 有必要了解一下,什么是异步编程,为什么要异步编程. 先说一个概念异步与同步.介绍异步之前,回顾一下,所谓同步编程,就是计算机一行一行按顺序依次执行代码,当前代码任务耗时执行会阻塞后续代码的执行. 同步编程,即是一种典型的请求-响应模型,当请求调用一个函数或方法后,需等待其响应返回,然后执行后续代码. 一般情

不得不说的异步编程

1.什么是异步编程? 异步编程就是把耗时的操作放进一个单独的线程中进行处理(该线程需要将执行进度反映到界面上).由于耗时操作是在另外一个线程中被执行的,所以它不会堵塞主线程.主线程开启这些单独的线程后,还可以继续执行其他操作(例如窗体绘制等). 异步编程可以提高用户体验,避免在进行耗时操作时让用户看到程序“卡死”的现象. 2.异步编程模型(APM) APM是Asynchronous Programming Mode的缩写,即异步编程模型的意思,它允许程序用更少的线程去执行更多的操作.在.NET