asyncio模块作用:构建协程并发应用的工具
python并发的三大内置模块,简单认识:
1、multiprocessing:多进程并发处理 2、threading模块:多线程并发处理 3、asyncio模块:协程并发处理
1、启动一个协程,任务无返回值,需要注意:async的使用
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了 async def coroutine(): print(‘协程运行...‘) # 定义一个事件循环监听 event_loop = asyncio.get_event_loop() try: print(‘协程开始...‘) coroutine_obj = coroutine() print(‘进入事件循环监听...‘) event_loop.run_until_complete(coroutine()) # run_until_complete翻译成中文:一直运行到完成为止 finally: print(‘关闭事件循环监听..‘) event_loop.close()
asyncio_coroutine.py
运行效果
[[email protected] mnt]# python3 asyncio_coroutine.py 协程开始... 进入事件循环监听... 协程运行... 关闭事件循环监听.. sys:1: RuntimeWarning: coroutine ‘coroutine‘ was never awaited
2、启动一个协程,任务有返回值,需要注意:async的使用
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了 async def coroutine(): print(‘协程运行...‘) return ‘ok‘ # 定义一个事件循环监听 event_loop = asyncio.get_event_loop() try: coroutine_obj = coroutine() return_value = event_loop.run_until_complete(coroutine()) # run_until_complete翻译成中文:一直运行到完成为止 print(‘coroutine()返回值:‘, return_value) finally: event_loop.close()
asyncio_coroutine_return.py
运行效果
[[email protected] mnt]# python3 asyncio_coroutine_return.py 协程运行... coroutine()返回值: ok sys:1: RuntimeWarning: coroutine ‘coroutine‘ was never awaited
3、启动一个协程,任务调用其它任务运行,需要注意:await 的使用
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了 async def coroutine(): print(‘coroutine内部运行‘) print(‘等待task_1运行结果‘) task1 = await task_1() #await作用:控制运行流程,按顺序执行,即等待该函数运行完成,再继续往后执行 print(‘等待task_2运行结果‘) task2 = await task_2(task1) return (task1, task2) async def task_1(): print(‘task_1内部运行‘) return ‘task_1 ok‘ async def task_2(arg): print(‘task_2内部运行‘) return ‘task_2 arg:{}‘.format(arg) # 定义一个事件循环监听 event_loop = asyncio.get_event_loop() try: coroutine_obj = coroutine() return_value = event_loop.run_until_complete(coroutine()) # run_until_complete翻译成中文:一直运行到完成为止 print(‘coroutine()返回值:‘, return_value) finally: event_loop.close()
asyncio_coroutine_chain.py
运行效果
[[email protected] mnt]# python3 asyncio_coroutine_chain.py coroutine内部运行 等待task_1运行结果 task_1内部运行 等待task_2运行结果 task_2内部运行 coroutine()返回值: (‘task_1 ok‘, ‘task_2 arg:task_1 ok‘) sys:1: RuntimeWarning: coroutine ‘coroutine‘ was never awaited
4、生成器而不是协程
Python3早期版本的语法如下
@asyncio.coroutine 替换为 async yield from 替换为 await
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了 @asyncio.coroutine def coroutine(): print(‘coroutine内部运行‘) print(‘等待task_1运行结果‘) task1 = yield from task_1() #await作用:控制运行流程,按顺序执行,即等待该函数运行完成,再继续往后执行 print(‘等待task_2运行结果‘) task2 = yield from task_2(task1) return (task1, task2) @asyncio.coroutine async def task_1(): print(‘task_1内部运行‘) return ‘task_1 ok‘ @asyncio.coroutine async def task_2(arg): print(‘task_2内部运行‘) return ‘task_2 arg:{}‘.format(arg) # 定义一个事件循环监听 event_loop = asyncio.get_event_loop() try: coroutine_obj = coroutine() return_value = event_loop.run_until_complete(coroutine()) # run_until_complete翻译成中文:一直运行到完成为止 print(‘coroutine()返回值:‘, return_value) finally: event_loop.close()
asyncio_generator.py
运行效果
[[email protected] mnt]# python3 asyncio_generator.py coroutine内部运行 等待task_1运行结果 task_1内部运行 等待task_2运行结果 task_2内部运行 coroutine()返回值: (‘task_1 ok‘, ‘task_2 arg:task_1 ok‘)
5、协程回调函数调用,此示例:讯速回调
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def callback(arg, *, kwarg=‘default‘): print(‘回调函数arg={},kwarg={}‘.format(arg, kwarg)) async def main(loop): print(‘注册回调函数‘) loop.call_soon(callback, 1) # 执行回调函数,传入参数1 wrapped = functools.partial(callback, kwarg=‘not default‘) # 利用偏函数,给kwarg传默认值 loop.call_soon(wrapped, 2) # 执行回调函数,传入参数2 await asyncio.sleep(0.5) event_loop = asyncio.get_event_loop() try: print(‘进入事件循环监听‘) event_loop.run_until_complete(main(event_loop)) # 将事件循环对象传入main函数中 finally: print(‘关闭事件循环监听‘) event_loop.close()
asyncio_call_soon.py
运行效果
[[email protected] mnt]# python3 asyncio_call_soon.py 进入事件循环监听 注册回调函数 回调函数arg=1,kwarg=default 回调函数arg=2,kwarg=not default 关闭事件循环监听
6、协程回调函数调用,此示例:延时回调
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def callback(arg): print(‘回调函数arg={}‘.format(arg)) async def main(loop): print(‘注册回调函数‘) loop.call_later(1,callback,‘延时1秒回调参数1‘) loop.call_later(1,callback,‘延时1秒回调参数2‘) loop.call_soon(callback,‘讯速的回调参数‘) await asyncio.sleep(3) event_loop = asyncio.get_event_loop() try: print(‘进入事件循环监听‘) event_loop.run_until_complete(main(event_loop)) # 将事件循环对象传入main函数中 finally: print(‘关闭事件循环监听‘) event_loop.close()
asyncio_call_delay.py
运行效果
[[email protected] mnt]# python3 asyncio_call_delay.py 进入事件循环监听 注册回调函数 回调函数arg=讯速的回调参数 回调函数arg=延时1秒回调参数1 回调函数arg=延时1秒回调参数2 关闭事件循环监听
7、协程回调函数调用,此示例:指定时间回调
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import time def callback(arg, loop): print(‘回调函数arg={} 回调的时间time={}‘.format(arg, loop.time())) async def main(loop): now = loop.time() print(‘时钟时间:{}‘.format(time.time())) print(‘时事件循环时间:{}‘.format(loop.time())) print(‘注册回调函数‘) loop.call_at(now + 1, callback, ‘参数1‘, loop) loop.call_at(now + 2, callback, ‘参数2‘, loop) loop.call_soon(callback, ‘讯速的回调参数‘, loop) await asyncio.sleep(4) event_loop = asyncio.get_event_loop() try: print(‘进入事件循环监听‘) event_loop.run_until_complete(main(event_loop)) # 将事件循环对象传入main函数中 finally: print(‘关闭事件循环监听‘) event_loop.close()
asyncio_call_at.py
运行结果
[[email protected] mnt]# python3 asyncio_call_at.py 进入事件循环监听 时钟时间:1576030580.730174 时事件循环时间:233762.828430848 注册回调函数 回调函数arg=讯速的回调参数 回调的时间time=233762.828485111 回调函数arg=参数1 回调的时间time=233763.829784903 回调函数arg=参数2 回调的时间time=233764.831077136 关闭事件循环监听
8、基于Future实现异步返回任务执行结果
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def mark_done(future, result): """标记完成的函数""" print(‘设置 Future 返回结果 {}‘.format(result)) future.set_result(result) event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() print(‘调度标记完成的函数‘) event_loop.call_soon(mark_done, all_done, ‘这个是调度传入的数据‘) result = event_loop.run_until_complete(all_done) print(‘运行返回的结果:{}‘.format(result)) finally: print(‘关闭事件循环监听‘) event_loop.close() print(‘Future 返回的结果: {}‘.format(all_done.result())) """ 结论: 返回结果可以从两个地方获取: 1、result = event_loop.run_until_complete(all_done) 2、Future.result() """
asyncio_future_event_loop.py
运行效果
[[email protected] mnt]# python3 asyncio_future_event_loop.py 调度标记完成的函数 设置 Future 返回结果 这个是调度传入的数据 运行返回的结果:这个是调度传入的数据 关闭事件循环监听 Future 返回的结果: 这个是调度传入的数据
9、基于Future+await类现异步返回任务执行结果
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def mark_done(future, result): """标记完成的函数""" print(‘设置 Future 返回结果 {}‘.format(result)) future.set_result(result) async def main(loop): all_done = asyncio.Future() print(‘调度标记完成的函数‘) loop.call_soon(mark_done, all_done, ‘这个是调度传入的数据‘) result = await all_done # await作用:等all_done返回结果,再往下运行 print(‘mark_done()执行完成,返回值 : {}‘.format(result)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: print(‘关闭事件循环监听‘) event_loop.close()
asyncio_future_await.py
运行效果
[[email protected] mnt]# python3 asyncio_future_await.py 调度标记完成的函数 设置 Future 返回结果 这个是调度传入的数据 mark_done()执行完成,返回值 : 这个是调度传入的数据 关闭事件循环监听
10、基于Future的回调
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def callback(future, n): print(‘{}: future 完成: {}‘.format(n, future.result())) async def register_callbacks(future_obj): print(‘将回调函数注册到Future中‘) # 这里需要注意的是add_done_callback函数,还为把当前实例对象作为参数,传给函数,所以回调函数多一个callback(future, n) future_obj.add_done_callback(functools.partial(callback, n=1)) future_obj.add_done_callback(functools.partial(callback, n=2)) async def main(future_obj): # 注册future的回调函数 await register_callbacks(future_obj) print(‘设置Future返回结果‘) future_obj.set_result(‘the result‘) event_loop = asyncio.get_event_loop() try: # 创建一个future实例 future_obj = asyncio.Future() # 增future实例传给main函数处理 event_loop.run_until_complete(main(future_obj)) finally: event_loop.close()
asyncio_future_callback.py
运行效果
[[email protected] mnt]# python3 asyncio_future_callback.py 将回调函数注册到Future中 设置Future返回结果 1: future 完成: the result 2: future 完成: the result
11、asyncio创建任务运行
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def task_func(): print(‘task_func 执行完成‘) return ‘task_func返回值ok‘ async def main(loop): print(‘创建任务‘) task = loop.create_task(task_func()) print(‘等待task的结果 {}‘.format(task)) return_value = await task #直到遇到await,上面的task开始运行 print(‘已完成任务{}‘.format(task)) #经过上面的运行,task里面已经有result执行结果 print(‘return value: {}‘.format(return_value)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
asyncio_future_create_task.py
运行结果
[[email protected] mnt]# python3 asyncio_future_create_task.py 创建任务 等待task的结果 <Task pending coro=<task_func() running at asyncio_future_create_task.py:11>> task_func 执行完成 已完成任务<Task finished coro=<task_func() done, defined at asyncio_future_create_task.py:11> result=‘task_func返回值ok‘> return value: task_func返回值ok
12、asyncio取消任务运行
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def task_func(): print(‘task_func 执行完成‘) return ‘task_func返回值ok‘ async def main(loop): print(‘创建任务‘) task = loop.create_task(task_func()) print(‘取消任务‘) task.cancel() print(‘取消任务结果 {}‘.format(task)) try: await task #直到遇到await,上面的task开始运行 except asyncio.CancelledError: print(‘从已取消的任务中捕获错误‘) else: print(‘任务执行结果 {}‘.format(task)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
asyncio_future_create_cancel_task.py
运行效果
[[email protected]mysql mnt]# python3 asyncio_future_create_cancel_task.py 创建任务 取消任务 取消任务结果 <Task cancelling coro=<task_func() running at asyncio_future_create_cancel_task.py:11>> 从已取消的任务中捕获错误
13、利用回调取消任务执行
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def task_func(): print(‘task_func睡眠‘) try: await asyncio.sleep(1) except asyncio.CancelledError: print(‘task_func 任务取消‘) raise return ‘task_func返回值ok‘ def task_canceller(task_obj): print(‘task_canceller运行‘) task_obj.cancel() print(‘取消task_obj任务‘) async def main(loop): print(‘创建任务‘) task = loop.create_task(task_func()) loop.call_soon(task_canceller, task) try: await task # 直到遇到await,上面的task开始运行 except asyncio.CancelledError: print(‘从已取消的任务中捕获错误‘) else: print(‘任务执行结果 {}‘.format(task)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
asyncio_future_create_callback_cancel_task.py
运行效果
[[email protected] mnt]# python3 asyncio_future_create_callback_cancel_task.py 创建任务 task_func睡眠 task_canceller运行 取消task_obj任务 task_func 任务取消 从已取消的任务中捕获错误
14、asyncio.ensure_future(),增加函数,直到await才运行
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def wrapped(): print(‘wrapped 运行‘) return ‘wrapped result‘ async def inner(task): print(‘inner: 开始运行‘) print(‘inner: task {!r}‘.format(task)) result = await task print(‘inner: task 返回值 {!r}‘.format(result)) async def start_task(): print(‘开始创建task‘) task = asyncio.ensure_future(wrapped()) print(‘等待inner运行‘) await inner(task) print(‘starter: inner returned‘) event_loop = asyncio.get_event_loop() try: print(‘进程事件循环‘) result = event_loop.run_until_complete(start_task()) finally: event_loop.close()
asyncio_ensure_future.py
运行效果
[[email protected] mnt]# python3 asyncio_ensure_future.py 进程事件循环 开始创建task 等待inner运行 inner: 开始运行 inner: task <Task pending coro=<wrapped() running at asyncio_ensure_future.py:11>> wrapped 运行 inner: task 返回值 ‘wrapped result‘
15、asyncio.wait()批量等待多个协程直到运行完成,包装多个返回显示结果
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase(i): print(‘phase形参传入值{}‘.format(i)) await asyncio.sleep(0.1 * i) print(‘完成phase的次数{}‘.format(i)) return ‘phase {} result‘.format(i) async def main(num_phase): print(‘main函数开始‘) phases = [ phase(i) for i in range(num_phase) ] print(‘等待phases里面的多个函数执行完成‘) completed, pending = await asyncio.wait(phases) # completed : 运行完成存在这里 ,pending : 没有运行完成存在这里 for_completed_results = [t.result() for t in completed] print(‘for_completed_results : {}‘.format(for_completed_results)) event_loop = asyncio.get_event_loop() try: print(‘进程事件循环‘) result = event_loop.run_until_complete(main(3)) finally: event_loop.close()
asyncio_wait.py
运行效果
[[email protected] mnt]# python3 asyncio_wait.py 进程事件循环 main函数开始 等待phases里面的多个函数执行完成 phase形参传入值2 phase形参传入值0 phase形参传入值1 完成phase的次数0 完成phase的次数1 完成phase的次数2 for_completed_results : [‘phase 2 result‘, ‘phase 0 result‘, ‘phase 1 result‘]
16、asyncio.wait()批量等待多个协程设置超时时间并且取消未完成的任务,包装多个返回显示结果
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase(i): print(‘phase形参传入值{}‘.format(i)) try: await asyncio.sleep(0.1 * i) except asyncio.CancelledError: print(‘phase {} 取消‘.format(i)) else: print(‘完成phase的次数{}‘.format(i)) return ‘phase {} result‘.format(i) async def main(num_phase): print(‘main函数开始‘) phases = [ phase(i) for i in range(num_phase) ] print(‘等待phases里面的多个函数执行完成‘) completed, pending = await asyncio.wait(phases, timeout=0.1) # completed : 运行完成存在这里 ,pending : 没有运行完成存在这里 print(‘completed长度:{},pending长度 :{}‘.format(len(completed), len(pending))) if pending: print(‘取消未完成的任务‘) for t in pending: t.cancel() print(‘main函数执行完成‘) event_loop = asyncio.get_event_loop() try: print(‘进程事件循环‘) result = event_loop.run_until_complete(main(3)) finally: event_loop.close()
asyncio_wait_timeout.py
运行效果
[[email protected] mnt]# python3 asyncio_wait_timeout.py 进程事件循环 main函数开始 等待phases里面的多个函数执行完成 phase形参传入值1 phase形参传入值2 phase形参传入值0 完成phase的次数0 completed长度:1,pending长度 :2 取消未完成的任务 main函数执行完成 phase 1 取消 phase 2 取消
17、asyncio.gather()多个协程运行,函数返回值接收
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase1(): print(‘phase1运行中‘) await asyncio.sleep(2) print(‘phase1运行完成‘) return ‘phase1 result‘ async def phase2(): print(‘phase2运行中‘) await asyncio.sleep(1) print(‘phase2运行完成‘) return ‘phase2 result‘ async def main(): print(‘main函数开始‘) results = await asyncio.gather( phase1(), phase2() ) print(‘results : {}‘.format(results)) event_loop = asyncio.get_event_loop() try: print(‘进程事件循环‘) result = event_loop.run_until_complete(main()) finally: event_loop.close()
asyncio_gather.py
运行效果
[[email protected] mnt]# python3 asyncio_gather.py 进程事件循环 main函数开始 phase1运行中 phase2运行中 phase2运行完成 phase1运行完成 results : [‘phase1 result‘, ‘phase2 result‘]
18、asyncio.as_completed()多个协程运行,函数返回值不是有序的接收
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase(i): print(‘phase {} 运行中‘.format(i)) await asyncio.sleep(0.5 - (0.1 * i)) print(‘phase {} 运行完成‘.format(i)) return ‘phase {} result‘.format(i) async def main(num_phases): print(‘main函数开始‘) phases = [ phase(i) for i in range(num_phases) ] print(‘等待phases运行完成‘) results = [] for next_to_complete in asyncio.as_completed(phases): task_result = await next_to_complete print(‘接到到task_result : {}‘.format(task_result)) results.append(task_result) print(‘results : {}‘.format(results)) return results event_loop = asyncio.get_event_loop() try: print(‘进程事件循环‘) event_loop.run_until_complete(main(3)) finally: event_loop.close()
asyncio_as_completed.py
运行效果
[[email protected] mnt]# python3 asyncio_as_completed.py 进程事件循环 main函数开始 等待phases运行完成 phase 2 运行中 phase 1 运行中 phase 0 运行中 phase 2 运行完成 接到到task_result : phase 2 result phase 1 运行完成 接到到task_result : phase 1 result phase 0 运行完成 接到到task_result : phase 0 result results : [‘phase 2 result‘, ‘phase 1 result‘, ‘phase 0 result‘]
19、asyncio.Lock() 协程锁的打开与关闭
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def unlock(lock): print(‘回调释放锁‘) lock.release() async def coro1(lock): """with方式获得锁""" async with lock: print(‘coro1 打开锁运算‘) print(‘coro1 锁已释放‘) async def coro2(lock): """传统方式获取锁""" await lock.acquire() try: print(‘coro2 打开锁运算‘) finally: print(‘coro2 锁已释放‘) lock.release() async def main(loop): lock = asyncio.Lock() print(‘启动协程之前获取锁‘) await lock.acquire() print(‘获得锁 {}‘.format(lock.locked())) # 运行完成,回调解锁 loop.call_later(0.1, functools.partial(unlock, lock)) print(‘等待协程运行‘) await asyncio.wait([coro1(lock), coro2(lock)]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
asyncio_lock.py
运行效果
[[email protected] mnt]# python3 asyncio_lock.py 启动协程之前获取锁 获得锁 True 等待协程运行 回调释放锁 coro2 打开锁运算 coro2 锁已释放 coro1 打开锁运算 coro1 锁已释放
20、asyncio.Event() 事件的查看与设置
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def set_event(event): print(‘回调设置event‘) event.set() async def coro1(event): print(‘coro1 等待事件‘) await event.wait() print(‘coro1 触发运行‘) async def coro2(event): print(‘coro2 等待事件‘) await event.wait() print(‘coro2 触发运行‘) async def main(loop): event = asyncio.Event() print(‘event开始之前状态:{}‘.format(event.is_set())) loop.call_later(0.1, functools.partial(set_event, event)) # 延时0.1秒后,回调set_event函数 await asyncio.wait([coro1(event), coro2(event)]) print(‘event开始之后状态:{}‘.format(event.is_set())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
asyncio_event
运行效果
[[email protected] mnt]# python3 asyncio_event.py event开始之前状态:False coro1 等待事件 coro2 等待事件 回调设置event coro1 触发运行 coro2 触发运行 event开始之后状态:True
21、asyncio.Condition(),对事件状态单独通知执行
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def consumer(condition_obj, i): async with condition_obj: print(‘消费者 {} 等待中‘.format(i)) await condition_obj.wait() print(‘消费者 {} 触发‘.format(i)) print(‘消费者 {} 消费结束‘.format(i)) async def manipulate_condition(condition_obj): print(‘开始操作condition‘) await asyncio.sleep(0.1) for i in range(3): async with condition_obj: print(‘通知消费者 {}‘.format(i)) condition_obj.notify(i) await asyncio.sleep(0.1) async with condition_obj: print(‘通知其它所有的消费者‘) condition_obj.notify_all() print(‘操作condition结束‘) async def main(loop): # 创建一个操作状态的对象 condition_obj = asyncio.Condition() # 运5个消费者函数 consumers = [ consumer(condition_obj, i) for i in range(5) ] # 创建一个操作状态的任务 loop.create_task(manipulate_condition(condition_obj)) # 等待consumers所有的函数执行完成 await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
asyncio_condition.py
运行效果
[[email protected] mnt]# python3 asyncio_condition.py 开始操作condition 消费者 2 等待中 消费者 3 等待中 消费者 0 等待中 消费者 4 等待中 消费者 1 等待中 通知消费者 0 通知消费者 1 消费者 2 触发 消费者 2 消费结束 通知消费者 2 消费者 3 触发 消费者 3 消费结束 消费者 0 触发 消费者 0 消费结束 通知其它所有的消费者 操作condition结束 消费者 4 触发 消费者 4 消费结束 消费者 1 触发 消费者 1 消费结束
22、协程队列Queue,生产者与消费者的示例
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def consumer(n, q): print(‘消费者 {} 开始‘.format(n)) while True: print(‘消费费 {} 等待消费‘.format(n)) item = await q.get() print(‘消费者 {} 消费了 {}‘.format(n, item)) if item is None: q.task_done() break else: await asyncio.sleep(0.01 * item) q.task_done() print(‘消费者 {} 结束‘.format(n)) async def producer(q, num_worker): print(‘生产者 开始‘) for i in range(num_worker * 3): await q.put(i) print(‘生产者 增加数据 {} 到队列中‘.format(i)) print(‘生产者 增加停止信号到队列中‘) for i in range(num_worker): await q.put(None) print(‘生产者 等待队列清空‘) await q.join() print(‘生产者 结束‘) async def main(loop, num_consumers): # 创建一个队列,最大的长度是num_consumers q = asyncio.Queue(maxsize=num_consumers) consumers = [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] producer_task = loop.create_task(producer(q, num_consumers)) await asyncio.wait(consumers + [producer_task]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop, 2)) finally: event_loop.close()
asyncio_queue.py
运行效果
[[email protected] mnt]# python3 asyncio_queue.py 消费者 0 开始 消费费 0 等待消费 消费者 1 开始 消费费 1 等待消费 生产者 开始 生产者 增加数据 0 到队列中 生产者 增加数据 1 到队列中 消费者 0 消费了 0 消费者 1 消费了 1 生产者 增加数据 2 到队列中 生产者 增加数据 3 到队列中 消费费 0 等待消费 消费者 0 消费了 2 生产者 增加数据 4 到队列中 消费费 1 等待消费 消费者 1 消费了 3 生产者 增加数据 5 到队列中 生产者 增加停止信号到队列中 消费费 0 等待消费 消费者 0 消费了 4 消费费 1 等待消费 消费者 1 消费了 5 生产者 等待队列清空 消费费 0 等待消费 消费者 0 消费了 None 消费者 0 结束 消费费 1 等待消费 消费者 1 消费了 None 消费者 1 结束 生产者 结束
23、利用 asyncio.Protocol 实现服务端和客户端数据相互传送
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys SERVER_ADDRESS = [‘localhost‘, 8000] class EchoServer(asyncio.Protocol): def connection_made(self, transport): self.transport = transport self.address = transport.get_extra_info(‘peername‘) self.log = logging.getLogger( ‘EchoServer_{}_{}‘.format(*self.address) ) self.log.debug(‘接收连接‘) def data_received(self, data): self.log.debug(‘接收数据 {}‘.format(data)) self.transport.write(data) self.log.debug(‘发送数据 {}‘.format(data)) def eof_received(self): self.log.debug(‘接收数据 EOF‘) if self.transport.can_write_eof(): self.transport.write_eof() def connection_lost(self, exc): if exc: self.log.error(‘错误 {}‘.format(exc)) else: self.log.debug(‘服务关闭‘) super(EchoServer, self).connection_lost(exc) logging.basicConfig( level=logging.DEBUG, format=‘%(name)s : %(message)s‘, stream=sys.stderr ) log = logging.getLogger(‘main‘) event_loop = asyncio.get_event_loop() factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug(‘开始运行 IP:{} Port:{}‘.format(*SERVER_ADDRESS)) try: event_loop.run_forever() finally: log.debug(‘关闭服务‘) server.close() event_loop.run_until_complete(server.wait_closed()) log.debug(‘关闭事件循环‘) event_loop.close()
asyncio_echo_server_protocol.py
#!/usr/bin/env python3 # encoding: utf-8 import asyncio import functools import logging import sys MESSAGES = [ b‘This is the message. ‘, b‘It will be sent ‘, b‘in parts.‘, ] SERVER_ADDRESS = (‘localhost‘, 8000) class EchoClient(asyncio.Protocol): def __init__(self, messages, future): super().__init__() self.messages = messages self.log = logging.getLogger(‘EchoClient‘) self.f = future def connection_made(self, transport): self.transport = transport self.address = transport.get_extra_info(‘peername‘) self.log.debug( ‘连接服务器IP:{} port :{}‘.format(*self.address) ) for msg in self.messages: transport.write(msg) self.log.debug(‘发送数据 {!r}‘.format(msg)) if transport.can_write_eof(): transport.write_eof() def data_received(self, data): self.log.debug(‘received {!r}‘.format(data)) def eof_received(self): self.log.debug(‘接收到 EOF‘) self.transport.close() if not self.f.done(): self.f.set_result(True) def connection_lost(self, exc): self.log.debug(‘服务器关闭连接‘) self.transport.close() if not self.f.done(): self.f.set_result(True) super().connection_lost(exc) #设置日志级别 logging.basicConfig( level=logging.DEBUG, format=‘%(name)s: %(message)s‘, stream=sys.stderr, ) #打印日志标题 log = logging.getLogger(‘main‘) #创建一个事件循环 event_loop = asyncio.get_event_loop() #创建客户端的Future client_completed = asyncio.Future() #利用偏函数自动传参给EchoClient实例化类 client_factory = functools.partial( EchoClient, messages=MESSAGES, future=client_completed, ) #创建一个事件循环连接 factory_coroutine = event_loop.create_connection( client_factory, *SERVER_ADDRESS, ) log.debug(‘等待客户端运行完成‘) try: event_loop.run_until_complete(factory_coroutine) event_loop.run_until_complete(client_completed) finally: log.debug(‘关闭事件循环‘) event_loop.close()
asyncio_echo_client_protocol.py
运行效果
服务端
[[email protected] mnt]# python3 asyncio_echo_server_protocol.py asyncio : Using selector: EpollSelector main : 开始运行 IP:localhost Port:8000 EchoServer_::1_54082 : 接收连接 EchoServer_::1_54082 : 接收数据 b‘This is the message. It will be sent in parts.‘ EchoServer_::1_54082 : 发送数据 b‘This is the message. It will be sent in parts.‘ EchoServer_::1_54082 : 接收数据 EOF EchoServer_::1_54082 : 服务关闭
客户端
[[email protected] mnt]# python3 asyncio_echo_client_protocol.py asyncio: Using selector: EpollSelector main: 等待客户端运行完成 EchoClient: 连接服务器IP:::1 port :8000 EchoClient: 发送数据 b‘This is the message. ‘ EchoClient: 发送数据 b‘It will be sent ‘ EchoClient: 发送数据 b‘in parts.‘ EchoClient: received b‘This is the message. It will be sent in parts.‘ EchoClient: 接收到 EOF EchoClient: 服务器关闭连接 main: 关闭事件循环
24、基于Coroutine 实现服务端和客户端数据相互传送,与22点示例功能一样)
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys SERVER_ADDRESS = (‘localhost‘, 8000) async def echo(reader, writer): address = writer.get_extra_info(‘peername‘) log = logging.getLogger(‘echo_{}_{}‘.format(*address)) log.debug(‘开始接受连接‘) while True: data = await reader.read(128) if data: log.debug(‘接受的数据 : {}‘.format(data)) writer.write(data) await writer.drain() log.debug(‘发送数据:{}‘.format(data)) else: log.debug(‘关闭连接‘) writer.close() return logging.basicConfig( level=logging.DEBUG, format=‘%(name)s : %(message)s‘, stream=sys.stderr ) log = logging.getLogger(‘main‘) event_loop = asyncio.get_event_loop() factory = asyncio.start_server(echo, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug(‘开始启动服务 IP:{},Port:{}‘.format(*SERVER_ADDRESS)) try: event_loop.run_forever() except KeyboardInterrupt: pass finally: log.debug(‘关闭服务端‘) server.close() event_loop.run_until_complete(server.wait_closed()) log.debug(‘关闭事件循环‘) event_loop.close()
asyncio_echo_server_coroutine.py
#!/usr/bin/env python3 # encoding: utf-8 import asyncio import logging import sys MESSAGES = [ b‘This is the message. ‘, b‘It will be sent ‘, b‘in parts.‘, ] SERVER_ADDRESS = (‘localhost‘, 8000) async def echo_client(address, messages): log = logging.getLogger(‘echo_client‘) log.debug(‘连接服务器 to {} port {}‘.format(*address)) # 创建与服务端连接 reader, writer = await asyncio.open_connection(*address) for msg in messages: writer.write(msg) log.debug(‘发送数据: {}‘.format(msg)) # 判断是否发送结束标记 if writer.can_write_eof(): writer.write_eof() # 等待所有发送完成 await writer.drain() log.debug(‘等待服务器响应‘) while True: data = await reader.read(128) if data: log.debug(‘接收服务器数据 :{}‘.format(data)) else: log.debug(‘关闭与服务器的连接‘) writer.close() return logging.basicConfig( level=logging.DEBUG, format=‘%(name)s: %(message)s‘, stream=sys.stderr, ) log = logging.getLogger(‘main‘) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( echo_client(SERVER_ADDRESS, MESSAGES) ) finally: log.debug(‘closing event loop‘) event_loop.close()
asyncio_echo_client_coroutine.py
运行效果
服务端
[[email protected] mnt]# python3 asyncio_echo_server_coroutine.py asyncio : Using selector: EpollSelector main : 开始启动服务 IP:localhost,Port:8000 echo_::1_54084 : 开始接受连接 echo_::1_54084 : 接受的数据 : b‘This is the message. It will be sent in parts.‘ echo_::1_54084 : 发送数据:b‘This is the message. It will be sent in parts.‘ echo_::1_54084 : 关闭连接 #这里使用Ctrl + C,运行后面的功能 ^Cmain : 关闭服务端 main : 关闭事件循环
客户端
[[email protected] mnt]# python3 asyncio_echo_client_coroutine.py asyncio: Using selector: EpollSelector echo_client: 连接服务器 to localhost port 8000 echo_client: 发送数据: b‘This is the message. ‘ echo_client: 发送数据: b‘It will be sent ‘ echo_client: 发送数据: b‘in parts.‘ echo_client: 等待服务器响应 echo_client: 接收服务器数据 :b‘This is the message. It will be sent in parts.‘ echo_client: 关闭与服务器的连接 main: closing event loop
25、基于Coroutine ,实现SSL的Socket通讯
#创建ssl证书
openssl req -newkey rsa:2048 -nodes -keyout test_ssl.key -x509 -days 800 -out test_ssl.crt
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys import ssl SERVER_ADDRESS = (‘localhost‘, 8000) async def echo(reader, writer): address = writer.get_extra_info(‘peername‘) log = logging.getLogger(‘echo_{}_{}‘.format(*address)) log.debug(‘开始接受连接‘) while True: data = await reader.read(128) #因为ssl不支持EOF结束,所以需要用‘\x00‘结束 terminate = data.endswith(b‘\x00‘) data = data.rstrip(b‘\x00‘) if data: log.debug(‘接受的数据 : {}‘.format(data)) writer.write(data) await writer.drain() log.debug(‘发送数据:{}‘.format(data)) if not data or terminate: log.debug(‘关闭连接‘) writer.close() return logging.basicConfig( level=logging.DEBUG, format=‘%(name)s : %(message)s‘, stream=sys.stderr ) log = logging.getLogger(‘main‘) event_loop = asyncio.get_event_loop() # 创建SSL所需要的对象 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.check_hostname = False ssl_context.load_cert_chain(‘test_ssl.crt‘, ‘test_ssl.key‘) factory = asyncio.start_server(echo, *SERVER_ADDRESS, ssl=ssl_context) server = event_loop.run_until_complete(factory) log.debug(‘开始启动服务 IP:{},Port:{}‘.format(*SERVER_ADDRESS)) try: event_loop.run_forever() except KeyboardInterrupt: pass finally: log.debug(‘关闭服务端‘) server.close() event_loop.run_until_complete(server.wait_closed()) log.debug(‘关闭事件循环‘) event_loop.close()
asyncio_echo_server_ssl.py
#!/usr/bin/env python3 # encoding: utf-8 import asyncio import logging import sys import ssl MESSAGES = [ b‘This is the message. ‘, b‘It will be sent ‘, b‘in parts.‘, ] SERVER_ADDRESS = (‘localhost‘, 8000) async def echo_client(address, messages): log = logging.getLogger(‘echo_client‘) # 客户端ssl所需要带证书访问 ssl_context = ssl.create_default_context( ssl.Purpose.SERVER_AUTH ) ssl_context.check_hostname = False ssl_context.load_verify_locations(‘test_ssl.crt‘) log.debug(‘连接服务器 to {} port {}‘.format(*address)) # 创建与服务端连接 reader, writer = await asyncio.open_connection(*address, ssl=ssl_context) for msg in messages: writer.write(msg) log.debug(‘发送数据: {}‘.format(msg)) # 判断是否发送结束标记 # 非ssl # if writer.can_write_eof(): # writer.write_eof() # ssl writer.write(b‘\x00‘) # 等待所有发送完成 await writer.drain() log.debug(‘等待服务器响应‘) while True: data = await reader.read(128) if data: log.debug(‘接收服务器数据 :{}‘.format(data)) else: log.debug(‘关闭与服务器的连接‘) writer.close() return logging.basicConfig( level=logging.DEBUG, format=‘%(name)s: %(message)s‘, stream=sys.stderr, ) log = logging.getLogger(‘main‘) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( echo_client(SERVER_ADDRESS, MESSAGES) ) finally: log.debug(‘closing event loop‘) event_loop.close()
asyncio_echo_client_ssl.py
运行效果
服务端
[[email protected] mnt]# python3 asyncio_echo_server_ssl.py asyncio : Using selector: EpollSelector main : 开始启动服务 IP:localhost,Port:8000 echo_::1_54094 : 开始接受连接 echo_::1_54094 : 接受的数据 : b‘This is the message. It will be sent in parts.‘ echo_::1_54094 : 发送数据:b‘This is the message. It will be sent in parts.‘ echo_::1_54094 : 关闭连接 ^Cmain : 关闭服务端 main : 关闭事件循环
客户端
[[email protected] mnt]# python3 asyncio_echo_client_ssl.py asyncio: Using selector: EpollSelector echo_client: 连接服务器 to localhost port 8000 echo_client: 发送数据: b‘This is the message. ‘ echo_client: 发送数据: b‘It will be sent ‘ echo_client: 发送数据: b‘in parts.‘ echo_client: 等待服务器响应 echo_client: 接收服务器数据 :b‘This is the message. It will be sent in parts.‘ echo_client: 关闭与服务器的连接 main: closing event loop
26、利用asyncio.SubprocessProtocol类继承的方式实现子进程的调用
#!/usr/bin/env python3 # encoding: utf-8 #end_pymotw_header import asyncio import functools class DFProtocol(asyncio.SubprocessProtocol): FD_NAMES = [‘stdin‘, ‘stdout‘, ‘stderr‘] def __init__(self, done_future): self.done = done_future self.buffer = bytearray() super().__init__() def connection_made(self, transport): print(‘process started {}‘.format(transport.get_pid())) self.transport = transport def pipe_data_received(self, fd, data): print(‘read {} bytes from {}‘.format(len(data), self.FD_NAMES[fd])) if fd == 1: self.buffer.extend(data) def process_exited(self): print(‘process exited‘) return_code = self.transport.get_returncode() print(‘return code {}‘.format(return_code)) if not return_code: cmd_output = bytes(self.buffer).decode() results = self._parse_results(cmd_output) else: results = [] self.done.set_result((return_code, results)) def _parse_results(self, output): print(‘parsing results‘) if not output: return [] lines = output.splitlines() headers = lines[0].split() devices = lines[1:] results = [ dict(zip(headers, line.split())) for line in devices ] return results async def run_df(loop): print(‘in run_df‘) cmd_done = asyncio.Future(loop=loop) factory = functools.partial(DFProtocol, cmd_done) proc = loop.subprocess_exec( factory, ‘df‘, ‘-hl‘, stdin=None, stderr=None, ) try: print(‘launching process‘) transport, protocol = await proc print(‘waiting for process to complete‘) await cmd_done finally: transport.close() return cmd_done.result() event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( run_df(event_loop) ) finally: event_loop.close() if return_code: print(‘error exit {}‘.format(return_code)) else: print(‘\nFree space:‘) for r in results: print(‘{Mounted:25}: {Avail}‘.format(**r))
asyncio_subprocess_protocol.py
运行效果
由于我使用的Python版本是3.6.6,调用的优先级是 1、调用 def connection_made(self, transport) 函数 2、调用 def process_exited(self)函数 3、调用 def pipe_data_received(self, fd, data)函数这里是输出结果,所以结束进程的时候process_exited解析是空,导致结果出不来,这里待pyhton版本验证
27、利用协程子进程的调用
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def _parse_results(output): print(‘解析结果‘) if not output: return [] lines = output.splitlines() headers = lines[0].split() devices = lines[1:] results = [ dict(zip(headers, line.split())) for line in devices ] return results async def run_df(): print(‘run_df函数运行‘) buffer = bytearray() create = asyncio.create_subprocess_exec( ‘df‘, ‘-h‘, stdout=asyncio.subprocess.PIPE ) print(‘df -h开始运行‘) proc = await create print(‘进程开始 {}‘.format(proc.pid)) while True: line = await proc.stdout.readline() print(‘读取 : {}‘.format(line)) if not line: print(‘命令不再输出‘) break buffer.extend(line) print(‘等待进程运行完成‘) await proc.wait() return_code = proc.returncode print(‘运行返回码:{}‘.format(return_code)) if not return_code: cmd_output = bytes(buffer).decode() results = _parse_results(cmd_output) else: results = [] return (return_code, results) event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( run_df() ) finally: event_loop.close() if return_code: print(‘错误退出,错误信息:{}‘.format(return_code)) else: print(‘运行结果:‘) for r in results: print(‘{Mounted:25}:{Avail}‘.format(**r))
asyncio_subprocess_coroutine.py
运行效果
[[email protected] mnt]# python3 asyncio_subprocess_coroutine.py run_df函数运行 df -h开始运行 进程开始 44244 读取 : b‘Filesystem Size Used Avail Use% Mounted on\n‘ 读取 : b‘/dev/mapper/centos-root 17G 7.9G 9.2G 47% /\n‘ 读取 : b‘devtmpfs 478M 0 478M 0% /dev\n‘ 读取 : b‘tmpfs 489M 0 489M 0% /dev/shm\n‘ 读取 : b‘tmpfs 489M 56M 433M 12% /run\n‘ 读取 : b‘tmpfs 489M 0 489M 0% /sys/fs/cgroup\n‘ 读取 : b‘/dev/sda1 1014M 125M 890M 13% /boot\n‘ 读取 : b‘tmpfs 98M 0 98M 0% /run/user/0\n‘ 读取 : b‘‘ 命令不再输出 等待进程运行完成 运行返回码:0 解析结果 运行结果: / :9.2G /dev :478M /dev/shm :489M /run :433M /sys/fs/cgroup :489M /boot :890M /run/user/0 :98M
28、利用协程管道传数据给子进程的调用处理
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def to_upper(input): print(‘进程转大写的to_upper函数‘) create = asyncio.create_subprocess_exec( ‘tr‘, ‘[:lower:]‘, ‘[:upper:]‘, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, ) print(‘等待子进程运行完成‘) proc = await create print(‘子进程PID {}‘.format(proc.pid)) print(‘查看子进程运行的标准输出和错误‘) stdout, stderr = await proc.communicate(input.encode()) print(‘等待子进程完成‘) await proc.wait() return_code = proc.returncode print(‘return code {}‘.format(return_code)) if not return_code: results = bytes(stdout).decode() else: results = ‘‘ return (return_code, results) MESSAGE = """ This message will be converted to all caps. """ event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( to_upper(MESSAGE) ) finally: event_loop.close() if return_code: print(‘错误时,退出的返回状态码 {}‘.format(return_code)) else: print(‘源数据: {!r}‘.format(MESSAGE)) print(‘处理过的数据 : {!r}‘.format(results))
asyncio_subprocess_coroutine_write.py
运行效果
[[email protected] mnt]# python3 asyncio_subprocess_coroutine_write.py 进程转大写的to_upper函数 等待子进程运行完成 子进程PID 78254 查看子进程运行的标准输出和错误 等待子进程完成 return code 0 源数据: ‘\nThis message will be converted\nto all caps.\n‘ 处理过的数据 : ‘\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n‘
29、协程之信号的注册处理
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools import os import signal def signal_handler(name): print(‘正在处理信号 : {}‘.format(name)) event_loop = asyncio.get_event_loop() # 给信号绑定处理的事件 event_loop.add_signal_handler( signal.SIGHUP, functools.partial(signal_handler, name=‘SIGHUP‘), ) event_loop.add_signal_handler( signal.SIGUSR1, functools.partial(signal_handler, name=‘SIGUSR1‘), ) event_loop.add_signal_handler( signal.SIGINT, functools.partial(signal_handler, name=‘SIGINT‘), ) async def send_signals(): pid = os.getpid() print(‘开始发送信号给PID:{}‘.format(pid)) for name in [‘SIGHUP‘, ‘SIGHUP‘, ‘SIGUSR1‘, ‘SIGINT‘]: print(‘发送信号名字:{}‘.format(name)) # 跟linux 命令kill一样,利用pid结束进程 os.kill(pid, getattr(signal, name)) print(‘放弃控制‘) await asyncio.sleep(0.01) return try: event_loop.run_until_complete(send_signals()) finally: event_loop.close()
asyncio_signal.py
运行效果
[[email protected] mnt]# python3 asyncio_signal.py 开始发送信号给PID:78320 发送信号名字:SIGHUP 放弃控制 正在处理信号 : SIGHUP 发送信号名字:SIGHUP 放弃控制 正在处理信号 : SIGHUP 发送信号名字:SIGUSR1 放弃控制 正在处理信号 : SIGUSR1 发送信号名字:SIGINT 放弃控制 正在处理信号 : SIGINT
29、协程与线程结合(ThreadPoolExecutor)
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys import concurrent.futures import time def blocks(n): log = logging.getLogger(‘blocks({})‘.format(n)) log.info(‘运行‘) time.sleep(0.1) log.info(‘done‘) return n ** 2 async def run_blocking_tasks(executor): """运行阻塞的任务""" log = logging.getLogger(‘run_blocking_tasks‘) log.info(‘开始运行‘) log.info(‘创建执行任务‘) loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info(‘等待执行任务‘) completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info(‘运行结果: {!r}‘.format(results)) log.info(‘exitrun_blocking_tasks 退出‘) if __name__ == ‘__main__‘: logging.basicConfig( level=logging.INFO, format=‘%(threadName)10s %(name)18s: %(message)s‘, stream=sys.stderr ) # 创建一个线程池执行器,最大开启3个工作线程 executor = concurrent.futures.ThreadPoolExecutor( max_workers=3 ) # 创建一个事件循环 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(run_blocking_tasks(executor)) finally: event_loop.close()
asyncio_ThreadPoolExecutor.py
运行效果
[[email protected] mnt]# python3 asyncio_ThreadPoolExecutor.py MainThread run_blocking_tasks: 开始运行 MainThread run_blocking_tasks: 创建执行任务 ThreadPoolExecutor-0_0 blocks(0): 运行 ThreadPoolExecutor-0_1 blocks(1): 运行 ThreadPoolExecutor-0_2 blocks(2): 运行 MainThread run_blocking_tasks: 等待执行任务 ThreadPoolExecutor-0_0 blocks(0): done ThreadPoolExecutor-0_0 blocks(3): 运行 ThreadPoolExecutor-0_1 blocks(1): done ThreadPoolExecutor-0_1 blocks(4): 运行 ThreadPoolExecutor-0_2 blocks(2): done ThreadPoolExecutor-0_2 blocks(5): 运行 ThreadPoolExecutor-0_1 blocks(4): done ThreadPoolExecutor-0_0 blocks(3): done ThreadPoolExecutor-0_2 blocks(5): done MainThread run_blocking_tasks: 运行结果: [0, 9, 16, 25, 1, 4] MainThread run_blocking_tasks: exitrun_blocking_tasks 退出
30、协程与进程结合(ProcessPoolExecutor)
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys import concurrent.futures import time def blocks(n): log = logging.getLogger(‘blocks({})‘.format(n)) log.info(‘运行‘) time.sleep(0.1) log.info(‘done‘) return n ** 2 async def run_blocking_tasks(executor): """运行阻塞的任务""" log = logging.getLogger(‘run_blocking_tasks‘) log.info(‘开始运行‘) log.info(‘创建执行任务‘) loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info(‘等待执行任务‘) completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info(‘运行结果: {!r}‘.format(results)) log.info(‘exitrun_blocking_tasks 退出‘) if __name__ == ‘__main__‘: logging.basicConfig( level=logging.INFO, format=‘PID %(process)5s %(name)18s: %(message)s‘, stream=sys.stderr ) # 创建一个线程池执行器,最大开启3个工作线程 executor = concurrent.futures.ProcessPoolExecutor( max_workers=3 ) # 创建一个事件循环 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(run_blocking_tasks(executor)) finally: event_loop.close()
asyncio_ProcessPoolExecutor.py
运行效果
[[email protected]]# python3 asyncio_ProcessPoolExecutor.py PID 91883 run_blocking_tasks: 开始运行 PID 91883 run_blocking_tasks: 创建执行任务 PID 91883 run_blocking_tasks: 等待执行任务 PID 91884 blocks(0): 运行 PID 91885 blocks(1): 运行 PID 91886 blocks(2): 运行 PID 91884 blocks(0): done PID 91884 blocks(3): 运行 PID 91886 blocks(2): done PID 91885 blocks(1): done PID 91886 blocks(4): 运行 PID 91885 blocks(5): 运行 PID 91884 blocks(3): done PID 91886 blocks(4): done PID 91885 blocks(5): done PID 91883 run_blocking_tasks: 运行结果: [25, 1, 4, 9, 0, 16] PID 91883 run_blocking_tasks: exitrun_blocking_tasks 退出
31、asyncio调试模式的开启
#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import asyncio import logging import sys import time import warnings #接收命令行 parser = argparse.ArgumentParser(‘debugging asyncio‘) parser.add_argument( ‘-v‘, dest=‘verbose‘, default=False, action=‘store_true‘, ) args = parser.parse_args() #设置日志级别 logging.basicConfig( level=logging.DEBUG, format=‘%(levelname)7s: %(message)s‘, stream=sys.stderr, ) LOG = logging.getLogger(‘‘) async def inner(): LOG.info(‘inner 函数开始‘) time.sleep(0.1) LOG.info(‘inner 运行完成‘) async def outer(loop): LOG.info(‘outer 函数开始‘) #ensure_future,直到await才运行 await asyncio.ensure_future(loop.create_task(inner())) LOG.info(‘outer 运行完成‘) event_loop = asyncio.get_event_loop() if args.verbose: LOG.info(‘开启DEBUG模式‘) event_loop.set_debug(True) # 使“慢”任务的阈值非常小以便于说明。默认值为0.1,即100毫秒。 event_loop.slow_callback_duration = 0.001 # 在警告过滤器列表中插入一个简单的条目(在前面)。 warnings.simplefilter(‘always‘, ResourceWarning) LOG.info(‘entering event loop‘) event_loop.run_until_complete(outer(event_loop))
asyncio_debug.py
运行效果
#开启Debug [[email protected] mnt]# python3 asyncio_debug.py DEBUG: Using selector: EpollSelector INFO: entering event loop INFO: outer 函数开始 INFO: inner 函数开始 INFO: inner 运行完成 INFO: outer 运行完成 [[email protected]-mysql mnt]# python3 asyncio_debug.py -v DEBUG: Using selector: EpollSelector INFO: 开启DEBUG模式 INFO: entering event loop INFO: outer 函数开始 WARNING: Executing <Task pending coro=<outer() running at asyncio_debug.py:42> wait_for=<Task pending coro=<inner() running at asyncio_debug.py:33> cb=[<TaskWakeupMethWrapper object at 0x7f882e06c0d8>()] created at asyncio_debug.py:42> cb=[_run_until_complete_cb() at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:177] created at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:447> took 0.003 seconds INFO: inner 函数开始 INFO: inner 运行完成 WARNING: Executing <Task finished coro=<inner() done, defined at asyncio_debug.py:33> result=None created at asyncio_debug.py:42> took 0.101 seconds INFO: outer 运行完成 #正常运行 [[email protected] mnt]# python3 asyncio_debug.py DEBUG: Using selector: EpollSelector INFO: entering event loop INFO: outer 函数开始 INFO: inner 函数开始 INFO: inner 运行完成 INFO: outer 运行完成 You have new mail in /var/spool/mail/root
32、利用生成器的方式,创建协程socket监听
#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys @asyncio.coroutine def echo(reader, writer): address = writer.get_extra_info(‘peername‘) log = logging.getLogger(‘echo_{}_{}‘.format(*address)) log.debug(‘connection accepted‘) while True: data = yield from reader.read(128) if data: log.debug(‘received {!r}‘.format(data)) writer.write(data) yield from writer.drain() log.debug(‘sent {!r}‘.format(data)) else: log.debug(‘closing‘) writer.close() return #开启Debug模式 logging.basicConfig( level=logging.DEBUG, format=‘%(name)s: %(message)s‘, stream=sys.stderr, ) #设置日志的title log = logging.getLogger(‘main‘) #设置开启服务的IP+端口 server_address = (‘localhost‘, 8888) #获取事件循环 event_loop = asyncio.get_event_loop() # 创建服务器,让循环在之前完成协同工作。并且启动实际事件循环 coroutine = asyncio.start_server(echo, *server_address,loop=event_loop) server = event_loop.run_until_complete(coroutine) log.debug(‘starting up on {} port {}‘.format(*server_address)) try: #开启一直循环处理任务 event_loop.run_forever() finally: #结束后清理的工作 log.debug(‘closing server‘) server.close() event_loop.run_until_complete(server.wait_closed()) log.debug(‘closing event loop‘) event_loop.close()
asyncio_echo_server_generator
运行效果
[[email protected] mnt]# python3 asyncio_echo_server_generator asyncio: Using selector: EpollSelector main: starting up on localhost port 8888
33、协程的关闭示例
import asyncio import logging import sys logging.basicConfig( level=logging.DEBUG, format=‘%(name)s: %(message)s‘, stream=sys.stderr, ) LOG = logging.getLogger(‘main‘) async def stopper(loop): LOG.debug(‘stopper invoked‘) loop.stop() event_loop = asyncio.get_event_loop() event_loop.create_task(stopper(event_loop)) try: LOG.debug(‘entering event loop‘) event_loop.run_forever() finally: LOG.debug(‘closing event loop‘) event_loop.close()
asyncio_stop.py
运行效果
[[email protected] mnt]# python3 asyncio_stop.py asyncio: Using selector: EpollSelector main: entering event loop main: stopper invoked main: closing event loop
原文地址:https://www.cnblogs.com/ygbh/p/12015664.html