【Python】【控制流程】【生成器 | 协程 | 期物 | 任务】对比与联系

Python 的 asyncio 类似于 C++ 的 Boost.Asio。

所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

Asyncio 是并发(concurrency)的一种方式。对 Python 来说,并发还可以通过线程(threading)和多进程(multiprocessing)来实现。

Asyncio 并不能带来真正的并行(parallelism)。当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。

可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 yield from 或 await)。

定义协程

协程的定义,需要使用 async def 语句。

?

code

1

async def do_some_work(x): pass

do_some_work 便是一个协程。

准确来说,do_some_work 是一个协程函数,可以通过 asyncio.iscoroutinefunction 来验证:

?

code

1

print(asyncio.iscoroutinefunction(do_some_work)) # True

这个协程什么都没做,我们让它睡眠几秒,以模拟实际的工作量 :

?

code

1

2

3

async def do_some_work(x):

print("Waiting " + str(x))

await asyncio.sleep(x)

在解释 await 之前,有必要说明一下协程可以做哪些事。协程可以:

?

code

1

2

3

4

5

* 等待一个 future 结束


  • 等待另一个协程(产生一个结果,或引发一个异常)
  • 产生一个结果给正在等它的协程
  • 引发一个异常给正在等它的协程

    asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程。可参见 asyncio.sleep 的文档:

?

code

1

2

sleep(delay, result=None, *, loop=None)

Coroutine that completes after a given time (in seconds).

运行协程

调用协程函数,协程并不会开始运行,只是返回一个协程对象,可以通过 asyncio.iscoroutine 来验证:

?

code

1

print(asyncio.iscoroutine(do_some_work(3))) # True

此处还会引发一条警告:

?

code

1

2

async1.py:16: RuntimeWarning: coroutine ‘do_some_work‘ was never awaited

print(asyncio.iscoroutine(do_some_work(3)))

要让这个协程对象运行的话,有两种方式:

?

code

1

2

3

* 在另一个已经运行的协程中用 await 等待它



  • 通过 ensure_future 函数计划它的执行

    简单来说,只有 loop 运行了,协程才可能运行。

    下面先拿到当前线程缺省的 loop ,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到运行。

?

code

1

2

loop = asyncio.get_event_loop()

loop.run_until_complete(do_some_work(3))

run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。

run_until_complete 的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,通过 ensure_future 函数把协程对象包装(wrap)成了 future。所以,我们可以写得更明显一些:

?

code

1

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

完整代码:

?

code

1

2

3

4

5

6

7

8

import asyncio

async def do_some_work(x):

print("Waiting " + str(x))

await asyncio.sleep(x)

loop = asyncio.get_event_loop()

loop.run_until_complete(do_some_work(3))

运行结果:

?

code

1

2

Waiting 3

<三秒钟后程序结束>

回调

假如协程是一个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。

?

code

1

2

3

4

5

6

7

def done_callback(futu):

print(‘Done‘)

futu = asyncio.ensure_future(do_some_work(3))

futu.add_done_callback(done_callback)

loop.run_until_complete(futu)

多个协程

实际项目中,往往有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,需要借助 asyncio.gather 函数。

?

code

1

loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

或者先把协程存在列表里:

?

code

1

2

coros = [do_some_work(1), do_some_work(3)]

loop.run_until_complete(asyncio.gather(*coros))

运行结果:

?

code

1

2

3

4

Waiting 3

Waiting 1

<等待三秒钟>

Done

这两个协程是并发运行的,所以等待的时间不是 1 + 3 = 4 秒,而是以耗时较长的那个协程为准。

参考函数 gather 的文档:

gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutines or futures.

发现也可以传 futures 给它:

?

code

1

2

3

4

futus = [asyncio.ensure_future(do_some_work(1)),

asyncio.ensure_future(do_some_work(3))]


loop.run_until_complete(asyncio.gather(*futus))

gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

run_until_complete 和 run_forever

我们一直通过 run_until_complete 来运行 loop ,等到 future 完成,run_until_complete 也就返回了。

?

code

1

2

3

4

5

6

7

8

9

async def do_some_work(x):

print(‘Waiting ‘ + str(x))

await asyncio.sleep(x)

print(‘Done‘)

loop = asyncio.get_event_loop()

coro = do_some_work(3)

loop.run_until_complete(coro)

输出:

?

code

1

2

3

4

Waiting 3

<等待三秒钟>

Done

<程序退出>

现在改用 run_forever:

?

code

1

2

3

4

5

6

7

8

9

10

11

async def do_some_work(x):

print(‘Waiting ‘ + str(x))

await asyncio.sleep(x)

print(‘Done‘)

loop = asyncio.get_event_loop()

coro = do_some_work(3)

asyncio.ensure_future(coro)

loop.run_forever()

输出:

?

code

1

2

3

4

Waiting 3

<等待三秒钟>

Done

<程序没有退出>

三秒钟过后,future 结束,但是程序并不会退出。run_forever 会一直运行,直到 stop 被调用,但是你不能像下面这样调 stop:

?

code

1

2

loop.run_forever()

loop.stop()

run_forever 不返回,stop 永远也不会被调用。所以,只能在协程中调 stop:

?

code

1

2

3

4

5

async def do_some_work(loop, x):

print(‘Waiting ‘ + str(x))

await asyncio.sleep(x)

print(‘Done‘)

loop.stop()

这样并非没有问题,假如有多个协程在 loop 里运行:

?

code

1

2

3

4

asyncio.ensure_future(do_some_work(loop, 1))

asyncio.ensure_future(do_some_work(loop, 3))


loop.run_forever()

第二个协程没结束,loop 就停止了——被先结束的那个协程给停掉的。

要解决这个问题,可以用 gather 把多个协程合并成一个 future,并添加回调,然后在回调里再去停止 loop。

?

code

1

2

3

4

5

6

7

8

9

10

11

12

13

14

async def do_some_work(loop, x):

print(‘Waiting ‘ + str(x))

await asyncio.sleep(x)

print(‘Done‘)

def done_callback(loop, futu):

loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))

futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever()

其实这基本上就是 run_until_complete 的实现了,run_until_complete 在内部也是调用 run_forever。

Close Loop?

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?

简单来说,loop 只要不关闭,就还可以再运行。:

?

code

1

2

3

loop.run_until_complete(do_some_work(loop, 1))

loop.run_until_complete(do_some_work(loop, 3))

loop.close()

但是如果关闭了,就不能再运行了:

?

code

1

2

3

loop.run_until_complete(do_some_work(loop, 1))

loop.close()

loop.run_until_complete(do_some_work(loop, 3)) # 此处异常

建议调用 loop.close,以彻底清理 loop 对象防止误用。

gather vs. wait

asyncio.gather 和 asyncio.wait 功能相似。

?

code

1

2

coros = [do_some_work(loop, 1), do_some_work(loop, 3)]

loop.run_until_complete(asyncio.wait(coros))

具体差别可请参见 StackOverflow 的讨论:Asyncio.gather vs asyncio.wait。

Timer

C++ Boost.Asio 提供了 IO 对象 timer,但是 Python 并没有原生支持 timer,不过可以用 asyncio.sleep 模拟。

?

code

1

2

3

4

5

6

7

async def timer(x, cb):

futu = asyncio.ensure_future(asyncio.sleep(x))

futu.add_done_callback(cb)

await futu


t = timer(3, lambda futu: print(‘Done‘))

loop.run_until_complete(t)

第一部分完。

一直对asyncio这个库比较感兴趣,毕竟这是官网也非常推荐的一个实现高并发的一个模块,python也是在python 3.4中引入了协程的概念。也通过这次整理更加深刻理解这个模块的使用

asyncio 是干什么的?

异步网络操作

并发

协程

python3.0时代,标准库里的异步网络模块:select(非常底层) python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio:支持TCP,子进程

现在的asyncio,有了很多的模块已经在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 这里列出了已经支持的内容,并在持续更新

当然到目前为止实现协程的不仅仅只有asyncio,tornado和gevent都实现了类似功能

关于asyncio的一些关键字的说明:

event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数

coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态

future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别

async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

看了上面这些关键字,你可能扭头就走了,其实一开始了解和研究asyncio这个模块有种抵触,自己也不知道为啥,这也导致很长一段时间,这个模块自己也基本就没有关注和使用,但是随着工作上用python遇到各种性能问题的时候,自己告诉自己还是要好好学习学习这个模块。

定义一个协程

复制代码

import time

import asyncio

now = lambda : time.time()

async def do_some_work(x):

print("waiting:", x)

start = now()

这里是一个协程对象,这个时候do_some_work函数并没有执行

coroutine = do_some_work(2)

print(coroutine)

创建一个事件loop

loop = asyncio.get_event_loop()

将协程加入到事件循环loop

loop.run_until_complete(coroutine)

print("Time:",now()-start)

复制代码

在上面带中我们通过async关键字定义一个协程(coroutine),当然协程不能直接运行,需要将协程加入到事件循环loop中

asyncio.get_event_loop:创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环

创建一个task

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象. task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果

复制代码

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):

print("waiting:", x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()

task = loop.create_task(coroutine)

print(task)

loop.run_until_complete(task)

print(task)

print("Time:",now()-start)

复制代码

结果为:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>

waiting: 2

<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>

Time: 0.0003514289855957031

创建task后,在task加入事件循环之前为pending状态,当完成后,状态为finished

关于上面通过loop.create_task(coroutine)创建task,同样的可以通过 asyncio.ensure_future(coroutine)创建task

关于这两个命令的官网解释: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

asyncio.ensure_future(coro_or_future, *, loop=None)?

Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

复制代码

AbstractEventLoop.create_task(coro)

Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

复制代码

绑定回调

绑定回调,在task执行完成的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。

复制代码

import time

import asyncio

now = lambda : time.time()

async def do_some_work(x):

print("waiting:",x)

return "Done after {}s".format(x)

def callback(future):

print("callback:",future.result())

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)

print(task)

task.add_done_callback(callback)

print(task)

loop.run_until_complete(task)

print("Time:", now()-start)

复制代码

结果为:

复制代码

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>

waiting: 2

callback: Done after 2s

Time: 0.00039196014404296875

复制代码

通过add_done_callback方法给task任务添加回调函数,当task(也可以说是coroutine)执行完成的时候,就会调用回调函数。并通过参数future获取协程执行的结果。这里我们创建 的task和回调里的future对象实际上是同一个对象

阻塞和await

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

复制代码

import asyncio

import time

now = lambda :time.time()

async def do_some_work(x):

print("waiting:",x)

# await 后面就是调用耗时的操作

await asyncio.sleep(x)

return "Done after {}s".format(x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)

loop.run_until_complete(task)

print("Task ret:", task.result())

print("Time:", now() - start)

复制代码

在await asyncio.sleep(x),因为这里sleep了,模拟了阻塞或者耗时操作,这个时候就会让出控制权。 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。

并发和并行

并发指的是同时具有多个活动的系统

并行值得是用并发来使一个系统运行的更快。并行可以在操作系统的多个抽象层次进行运用

所以并发通常是指有多个任务需要同时进行,并行则是同一个时刻有多个任务执行

下面这个例子非常形象:

并发情况下是一个老师在同一时间段辅助不同的人功课。并行则是好几个老师分别同时辅助多个学生功课。简而言之就是一个人同时吃三个馒头还是三个人同时分别吃一个的情况,吃一个馒头算一个任务

复制代码

import asyncio

import time

now = lambda :time.time()

async def do_some_work(x):

print("Waiting:",x)

await asyncio.sleep(x)

return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:

print("Task ret:",task.result())

print("Time:",now()-start)

复制代码

运行结果:

复制代码

Waiting: 1

Waiting: 2

Waiting: 4

Task ret: Done after 1s

Task ret: Done after 2s

Task ret: Done after 4s

Time: 4.004154920578003

复制代码

总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s。此时我们使用了aysncio实现了并发。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

关于asyncio.gather和asyncio.wait官网的说明:

https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

复制代码

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

复制代码

https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

复制代码

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.

复制代码

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

复制代码

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):

print("waiting:",x)

await asyncio.sleep(x)

return "Done after {}s".format(x)

async def main():

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

dones, pendings = await asyncio.wait(tasks)
for task in dones:
    print("Task ret:", task.result())

# results = await asyncio.gather(*tasks)
# for result in results:
#     print("Task ret:",result)

start = now()

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

print("Time:", now()-start)

复制代码

如果我们把上面代码中的:

dones, pendings = await asyncio.wait(tasks)
for task in dones:
    print("Task ret:", task.result())

替换为:

results = await asyncio.gather(*tasks)
for result in results:
    print("Task ret:",result)

这样得到的就是一个结果的列表

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。 将上述的代码更改为:

复制代码

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):

print("waiting:",x)

await asyncio.sleep(x)

return "Done after {}s".format(x)

async def main():

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()

results = loop.run_until_complete(main())

for result in results:

print("Task ret:",result)

print("Time:", now()-start)

复制代码

或者返回使用asyncio.wait方式挂起协程。

将代码更改为:

复制代码

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):

print("waiting:",x)

await asyncio.sleep(x)

return "Done after {}s".format(x)

async def main():

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()

done,pending = loop.run_until_complete(main())

for task in done:

print("Task ret:",task.result())

print("Time:", now()-start)

复制代码

也可以使用asyncio的as_completed方法

复制代码

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):

print("waiting:",x)

await asyncio.sleep(x)

return "Done after {}s".format(x)

async def main():

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

for task in asyncio.as_completed(tasks):

result = await task

print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

print("Time:", now()-start)

复制代码

从上面也可以看出,协程的调用和组合非常灵活,主要体现在对于结果的处理:如何返回,如何挂起

协程的停止

future对象有几个状态:

Pending

Running

Done

Cacelled

创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

import asyncio

import time

now = lambda :time.time()

async def do_some_work(x):

print("Waiting:",x)

await asyncio.sleep(x)

return "Done after {}s".format(x)

coroutine1 =do_some_work(1)

coroutine2 =do_some_work(2)

coroutine3 =do_some_work(2)

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3),]

start = now()

loop = asyncio.get_event_loop()

try:

loop.run_until_complete(asyncio.wait(tasks))

except KeyboardInterrupt as e:

print(asyncio.Task.all_tasks())

for task in asyncio.Task.all_tasks():

print(task.cancel())

loop.stop()

loop.run_forever()

finally:

loop.close()

print("Time:",now()-start)

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:

Waiting: 1

Waiting: 2

Waiting: 2

^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result=‘Done after 1s‘>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=

循环task,逐个cancel是一种方案,可是正如上面我们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可。

不同线程的事件循环

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

import asyncio

from threading import Thread

import time

now = lambda :time.time()

def start_loop(loop):

asyncio.set_event_loop(loop)

loop.run_forever()

def more_work(x):

print(‘More work {}‘.format(x))

time.sleep(x)

print(‘Finished more work {}‘.format(x))

start = now()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))

t.start()

print(‘TIME: {}‘.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)

new_loop.call_soon_threadsafe(more_work, 3)

复制代码

启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法, 后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3

新线程协程

复制代码

import asyncio

import time

from threading import Thread

now = lambda :time.time()

def start_loop(loop):

asyncio.set_event_loop(loop)

loop.run_forever()

async def do_some_work(x):

print(‘Waiting {}‘.format(x))

await asyncio.sleep(x)

print(‘Done after {}s‘.format(x))

def more_work(x):

print(‘More work {}‘.format(x))

time.sleep(x)

print(‘Finished more work {}‘.format(x))

start = now()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))

t.start()

print(‘TIME: {}‘.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)

asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。 主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。

原文地址:https://www.cnblogs.com/suren2017/p/9452769.html

时间: 2024-11-07 23:05:43

【Python】【控制流程】【生成器 | 协程 | 期物 | 任务】对比与联系的相关文章

python 高性能编程之协程

用 greenlet 协程处理异步事件 自从 PyCon 2011 协程成为热点话题以来,我一直对此有着浓厚的兴趣.为了异步,我们曾使用多线程编程.然而线程在有着 GIL 的 Python 中带来的性能瓶颈和多线程编程的高出错风险,"协程 + 多进程"的组合渐渐被认为是未来发展的方向.技术容易更新,思维转变却需要一个过渡.我之前在异步事件处理方面已经习惯了回调 + 多线程的思维方式,转换到协程还非常的不适应.这几天我非常艰难地查阅了一些资料并思考,得出了一个可能并不可靠的总结.尽管这个

python的进程/线程/协程

1.python的多线程 多线程就是在同一时刻执行多个不同的程序,然而python中的多线程并不能真正的实现并行,这是由于cpython解释器中的GIL(全局解释器锁)捣的鬼,这把锁保证了同一时刻只有一个线程被执行. 多线程的特点: 线程比进程更轻量级,创建一个线程要比创建一个进程快10-100倍. 线程共享全局变量. 由于GIL的原因,当一个线程遇到IO操作时,会切换到另一个线程,所以线程适合IO密集型操作. 在多核cpu系统中,最大限度的利用多核,可以开启多个线程,开销比进程小的多,但是这并

【PYTHON模块】:协程与greenlet、gevent

协程:又称为微线程,英文名称Coroutine. 作用:它拥有自己的寄存器上下文和栈,能保留上一次调用时的状态,可以随时暂停程序,随时切换回来. 优点: ?无需线程上下文切换的开销    ?无需原子操作锁定及同步的开销 ?方便切换控制流,简化编程模型    ?高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题.所以很适合用于高并发处理 缺点:    ?无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上    ?进行阻塞

python并发编程之---协程

1.什么是协程 协程:是单线程下的并发,又称微线程,纤程. 协程是一种用户态的轻量级线程,协程是由用户程序自己控制调度的. 2.需要注意的点: 需要强调的是: #1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行) #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关) 对比操作系统控制线程的切换,用户在单线程内控制协程的切换 优点

Python并发实践_02_协程

python中实现并发的方式有很多种,通过多进程并发可以真正利用多核资源,而多线程并发则实现了进程内资源的共享,然而Python中由于GIL的存在,多线程是没有办法真正实现多核资源的. 对于计算密集型程序,应该使用多进程并发充分利用多核资源,而在IO密集型程序中,多核优势并不明显,甚至由于大多数时间都是在IO堵塞状态,多进程的切换消耗反而让程序效率更加低下. 而当需要并发处理IO密集型任务时,就需要用到协程(Coroutine).协程并没有系统级的调度,而是用户级的调度方式,避免了系统调用的开销

10.迭代器/生成器/协程函数/列表生成器

迭代器为什么要用迭代器?小结:生成器为什么要使用生成器,什么是生成器?如何创建一个生成器举个栗子:斐波拉契数列用yield返回结果的执行流程作业代码以及注释:协程函数面向过程编程作业以及代码注解:典型范例以及代码解析:列表生成式生成器表达式作业和练习 迭代器 为什么要用迭代器? 提供了一种不依赖索引的取值方式,使一些不具有索引属性的对象也能遍历输出 相比列表,迭代器的惰性计算更加节约内存. 但是它无法有针对性地指定取某个值,并且只能向后取值. >>> from collections i

python并发编程之协程

阅读目录 一 引子 二 协程介绍 三 Greenlet 四 Gevent介绍 五 Gevent之同步与异步 六 Gevent之应用举例一 七 Gevent之应用举例二 一 引子 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态 cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长 ps:在介绍进程理论时,提

python之并发编程—协程

引子 之前我们学习了线程.进程的概念,了解了在操作系统中进程是资源分配的最小单位,线程是CPU调度的最小单位.按道理来说我们已经算是把cpu的利用率提高很多了.但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程.创建线程.以及管理他们之间的切换. 随着我们对于效率的追求不断提高,基于单线程来实现并发又成为一个新的课题,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发.这样就可以节省创建线进程所消耗的时间. 为此我们需要先回顾下并发的本质:切换+保存状

python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)

1.协程 1 #协程 又称微线程 是一种用户的轻量级线程 程序级别代码控制 就不用加机器 2 #不同函数 = 不同任务 A函数切到B函数没有进行cpu级别的切换,而是程序级别的切换就是协程 yelied 3 4 #单线程下多个任务流用协程,比如打电话可以切换,nginx 5 #爽妹给你打电话的时候,她不说话,刘征电话过来时候你可以切过去,这时候要是爽妹说话,就会bibi响 6 ''' 7 8 协程的好处: 9 无需线程上下文切换的开销 10 无需原子操作锁定及同步的开销 11 "原子操作(ato