用yield实现协程 和asyncio模块

用yield实现协程

#基于yield实现异步
import time
def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
        x=yield

def producer():
    '''任务2:生产数据'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

producer()

使用yield from实现的协程

import datetime
import heapq    # 堆模块
import types
import time

class Task:
    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until

class SleepingLoop:

    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        while self._waiting:
            now = datetime.datetime.now()
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                print('*'*50)
                wait_until = task.coro.send(now)
                print('-'*50)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                passdef sleep(seconds):
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    print('before yield wait_until')
    actual = yield wait_until   # 返回一个datetime数据类型的时间
    print('after yield wait_until')
    return actual - now

def countdown(label, length, *, delay=0):
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = yield from sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
        print(label, 'T-minus', length)
        waited = yield from sleep(1)
        length -= 1
    print(label, 'lift-off!')

def main():
    loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2),
                        countdown('C', 4, delay=1))
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)

if __name__ == '__main__':
    main()

await和async关键字

使用 async function 可以定义一个 异步函数,在async关键字定义的函数中不能出现yield和yield from

# 例1
async def download(url):   # 加入新的关键字 async ,可以将任何一个普通函数变成协程
    return 'eva'

ret = download('http://www.baidu.com/')
print(ret)  # <coroutine object download at 0x108b3a5c8>
ret.send(None)  # StopIteration: eva

# 例2
async def download(url):
    return 'eva'
def run(coroutine):
    try:
        coroutine.send(None)
    except StopIteration as e:
        return e.value

coro = download('http://www.baidu.com/')
ret = run(coro)
print(ret)

async关键字不能和yield一起使用,引入coroutine装饰器来装饰downloader生成器。

await 操作符后面必须跟一个awaitable对象(通常用于等待一个会有io操作的任务, 它只能在异步函数 async function 内部使用

# 例3
import types

@types.coroutine      # 将一个生成器变成一个awaitable的对象
def downloader(url):
    yield 'aaa'

async def download_url(url):   # 协程
    waitable = downloader(url)
    print(waitable)   # <generator object downloader at 0x1091e2c78>     生成器
    html = await waitable
    return html

coro = download_url('http://www.baidu.com')
print(coro)                # <coroutine object download_url at 0x1091c9d48>
ret = coro.send(None)
print(ret)

asyncio模块

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

coroutine+yield from
import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
async+await
import asyncio

async def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    r = await asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

执行多个任务

import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again!")
    return 'done'

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([hello(),hello()]))
loop.close()

获取返回值

import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again!")
    return 'done'

loop = asyncio.get_event_loop()
task = loop.create_task(hello())
loop.run_until_complete(task)
ret = task.result()
print(ret)

执行多个任务获取返回值

import asyncio

async def hello(i):
    print("Hello world!")
    await asyncio.sleep(i)
    print("Hello again!")
    return 'done',i

loop = asyncio.get_event_loop()
task1 = loop.create_task(hello(2))
task2 = loop.create_task(hello(1))
task_l = [task1,task2]
tasks = asyncio.wait(task_l)
loop.run_until_complete(tasks)
for t in task_l:
    print(t.result())

执行多个任务按照返回的顺序获取返回值

import asyncio

async def hello(i):
    print("Hello world!")
    await asyncio.sleep(i)
    print("Hello again!")
    return 'done',i

async def main():
    tasks = []
    for i in range(20):
        tasks.append(asyncio.ensure_future(hello((20-i)/10)))
    for res in asyncio.as_completed(tasks):
        result = await res
        print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

asyncio使用协程完成http访问

import asyncio

async def get_url():
    reader,writer = await asyncio.open_connection('www.baidu.com',80)
    writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
    all_lines = []
    async for line in reader:
        data = line.decode()
        all_lines.append(data)
    html = '\n'.join(all_lines)
    return html

async def main():
    tasks = []
    for url in range(20):
        tasks.append(asyncio.ensure_future(get_url()))
    for res in asyncio.as_completed(tasks):
        result = await res
        print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())  # 处理一个任务
    loop.run_until_complete(asyncio.wait([main()]))  # 处理多个任务

    task = loop.create_task(main())  # 使用create_task获取返回值
    loop.run_until_complete(task)
    loop.run_until_complete(asyncio.wait([task]))

gevent模块实现协程

http://www.cnblogs.com/Eva-J/articles/8324673.html

原文地址:https://www.cnblogs.com/ciquankun/p/12141586.html

时间: 2024-10-07 12:44:21

用yield实现协程 和asyncio模块的相关文章

Python:线程、进程与协程(4)——multiprocessing模块(1)

multiprocessing模块是Python提供的用于多进程开发的包,multiprocessing包提供本地和远程两种并发,通过使用子进程而非线程有效地回避了全局解释器锁. (一)创建进程Process 类 创建进程的类,其源码在multiprocessing包的process.py里,有兴趣的可以对照着源码边理解边学习.它的用法同threading.Thread差不多,从它的类定义上就可以看的出来,如下: class Process(object):     '''     Proces

Python:线程、进程与协程(3)——Queue模块及源码分析

Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式.该模块提供了三种队列: Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列 Queue.LifoQueue(maxsize):后进先出,相当于栈 Queue.PriorityQueue(maxsize):优先级队列. 其中LifoQueue,PriorityQueue是Queue的子类.三者拥有以下共同的方法: qsize():返回近似的队列大小.为什么要加"近似&q

Python入门学习-DAY37-进程池与线程池、协程、gevent模块

一.进程池与线程池 基本使用: 进程池和线程池操作一样 提交任务的两种方式: 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的 同步调用 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os

协程:Greenlet模块、Gevent模块

三.Greenlet模块 Greenlet是python的一个C扩展,来源于Stackless python,旨在提供可自行调度的'微线程', 即协程.generator实现的协程在yield value时只能将value返回给调用者(caller). 而在greenlet中,target.switch(value)可以切换到指定的协程(target), 然后yield value.greenlet用switch来表示协程的切换,从一个协程切换到另一个协程需要显式指定. 安装 :pip3 ins

多任务异步协程,asyncio及aiohttp

要实现异步协程,需要满足几个条件: 1,创建协程对象,且协程内部操作需要支持异步. 2,创建任务对象,如需为协程执行结果进行进一步处理,则需要为任务对象绑定回调函数. 3,创建事件循环,并将任务启动. 1 import asyncio 2 import requests 3 from lxml import etree 4 import aiohttp 5 import os 6 7 headers = { 8 9 'User-Agent': 'Mozilla/5.0 (Windows NT 1

Python:线程、进程与协程(2)——threading模块(1)

上一篇博文介绍了Python中线程.进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块.首先来看看threading模块有哪些方法和类吧. 主要有: Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能. Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类. Lock :原锁,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用. RLock :可重入锁

python全栈脱产第37天------进程池与线程池、协程、gevent模块、单线程下实现并发的套接字通信

一.进程池与线程池 调用concurrent.futures下的ThreadPoolExecutor,ProcessPoolExecutor来实现 提交任务有两种方式:同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,在执行下一段代码,是串行的 异步调用:提交完一个任务之后,不在原地等待,直接运行下一段代码,任务是并发的 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimp

单线程实现并发——协程,gevent模块

一 并发的本质 1 切换 2 保存状态 二 协程的概念 协程,又称微线程,纤程.英文名Coroutine.单线程下实现并发,用户从应用程序级别控制单线程下任务的切换,注意一定是遇到I/O才切. 协程的特点在于是一个线程执行,那和多线程比,协程有何优势? 最大的优势就是协程极高的执行效率.因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显. 第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中

Python:线程、进程与协程(5)——multiprocessing模块(2)

上篇博文介绍了Python的multiprocessing模块创建进程Process 类,进程间通信,进程间的同步三个部分,下面接着介绍学习进程共享. (1)内存共享 在多进程情况下,由于每个进程有自己独立的内存空间,怎样能实现内存共享呢?multiprocessing模块提供了Value, Array,这两个是函数,详细定义在sharedctypes.py里,有兴趣的可以去看看(等了解了ctypes模块后回头再分享下我的理解,今天就先放放)   Value Value的初始化非常简单,直接类似