asyncio并发编程

目录

  • asyncio并发编程

    • 事件循环

      • 基本使用
      • 获取协程的返回值
      • 回调
      • wait和gather
    • task取消和子协程调用原理
      • task取消
      • 子协程
    • 其他方法
      • call_soon
      • call_later
      • call_at
      • call_soon_threadsafe
    • ThreadPollExecutor 和 asyncio完成阻塞io请求
    • asyncio发送http请求
    • asyncio同步和通信
      • 同步
      • 通信

asyncio并发编程

asyncio是Python3.4引入的一个用于异步IO的库,其主要功能如下

1)包含各种特定系统实现的模块化事件循环

2)传输和协议抽象

3)对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持

4)模仿futures模块但适用于事件循环使用的Future类

5)基于yield from的协议和任务,可以让我们用顺序的方式编写并发代码

6)必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池

7)模仿threading模块中的同步原语、可以用在单线程内的协程之间

事件循环

基本使用

来看一个示例

# 异步编程三要素:事件循环+回调(驱动生成器)+IO多路复用
# asyncio是python用于解决异步IO编程的一整套解决方案
# 基于asyncio的框架:tornado、gevent、twisted(scrapy,django channels)
# tornado(实现了web服务器),django+flask

# 使用asyncio
import asyncio
import time

async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)  # 耗时操作一定要放到await里面
    # time.sleep(2)
    print("end get url")

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html("http://www.baidu.com") for _ in range(10)]
    # loop.run_until_complete(get_html("http://www.baidu.com"))
    loop.run_until_complete(asyncio.wait(tasks))    # 一次执行多个任务
    print(time.time()-start_time)

获取协程的返回值

import asyncio
import time

async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    # time.sleep(2)
    print("end get url")
    return "loop_test"

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # ensure_future返回的是一个future对象
    get_future = asyncio.ensure_future(get_html("http://www.baidu.com"))
    loop.run_until_complete(get_future)     # run_until_complete也可以接收一个future对象
    print(get_future.result())  # loop_test

还有一种方式就是通过task对象获取返回值

import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.baidu.com"))
    loop.run_until_complete(task)
    print(get_future.result())  # loop_test

回调

如果在执行完协程后需要回调,比如说我们需要发送邮件,可以使用task对象的add_done_callback方法

import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"

def callback(future):   # 注意这里默认会将future传给回调函数,所以必须有一个参数接收
    print("send email to robin")

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.baidu.com"))
    task.add_done_callback(callback)
    loop.run_until_complete(task)
    print(task.result())

这里有一个问题,如果回调函数需要参数的话就不行了,因为我们在调用的时候只传入了一个函数名,要传入参数,可以使用functool的partial函数

import time
from functools import partial

async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"

def callback(name, future):   # 注意这里callback自己的参数放在前面,future放在后面
    print("send email to %s" % name)

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.baidu.com"))
    task.add_done_callback(partial(callback, "sansa"))  # 将callback函数包裹起来
    loop.run_until_complete(task)
    print(task.result())

wait和gather

wait用于一次提交多个任务,gather与task在使用上基本相似,区别在于gather传参时需要使用*打散

下面来具体说一下详细区别

1)gather更加high-level(抽象层次更高)

2)gather可以分组

import asyncio
import time
from functools import partial

async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"

def callback(name, future):   # 注意这里默认会将future传给回调函数,所以必须有一个参数接收
    print("send email to %s" % name)

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task1 = [get_html("http://www.baidu.com") for _ in range(5)]
    task2 = [get_html("http://www.baidu.com") for _ in range(5)]
    loop.run_until_complete(asyncio.gather(*task1, *task2))     # 这里需要打散

后面可以改写成

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task1 = [get_html("http://www.baidu.com") for _ in range(5)]
    task2 = [get_html("http://www.baidu.com") for _ in range(5)]
    group1 = asyncio.gather(*task1)
    group2 = asyncio.gather(*task2)
    loop.run_until_complete(asyncio.gather(group1, group2))

task取消和子协程调用原理

task取消

import asyncio
import time

async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after %s s" % sleep_times)

if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)

    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:  # 命令行运行时按ctrl+c终止程序
        all_tasks = asyncio.Task.all_tasks()    # 获取所有task
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())    # 取消成功返回True
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()    # 注意close与stop的区别

子协程

下面来看一个官方示例

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

执行结果

Compute 1 + 2 ...
# 1 s
1 + 2 = 3

上述协程的时序图如下

其他方法

call_soon

call_soon是立即启动的意思,传入一个函数和函数需要的参数

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2)
    loop.run_forever()

执行结果

sleep 2 seconds
# 程序继续运行中...

我们需要在协程结束后终止事件循环,因此需要再定义一个函数

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)

def stoploop(loop):     # 终止循环
    loop.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2)     # 第一个参数是函数名,后面为动态参数
    loop.call_soon(stoploop, loop)
    loop.run_forever()

call_later

call_later按等待时间执行函数,会根据传入的时间排出一个顺序

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)

def stoploop(loop):
    loop.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_later(1, callback, 1)     # 第一个参数是函数名,后面为动态参数
    loop.call_later(3, callback, 3)     # 第一个参数是函数名,后面为动态参数
    loop.call_later(2, callback, 2)     # 第一个参数是函数名,后面为动态参数
    # loop.call_soon(stoploop, loop)    # 这里如果使用call_soon那么loop会先stop,就看不到效果了
    loop.run_forever()

执行结果

sleep 1 seconds
sleep 2 seconds
sleep 3 seconds
# 程序继续运行中...

call_at

call_at在指定时间执行

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)

def stoploop(loop):
    loop.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    now = loop.time()
    loop.call_at(now+2, callback, 1)     # 第一个参数是函数名,后面为动态参数
    loop.call_at(now+3, callback, 3)     # 第一个参数是函数名,后面为动态参数
    loop.call_at(now+2, callback, 2)     # 第一个参数是函数名,后面为动态参数
    loop.run_forever()

call_soon_threadsafe

call_soon_threadsafe是线程安全的call_soon,涉及到多线程时,使用这个

ThreadPollExecutor 和 asyncio完成阻塞io请求

什么时候使用多线程:在协程中集成阻塞io

在协程中不要放阻塞的代码, 但如果非要使用阻塞的代码, 就可以放到线程池中运行。

import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
import time

def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, 80))

    client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break
    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]   # 把请求头信息去掉,只要网页内容
    print(html_data)
    client.close()

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)    # 线程池
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        # 阻塞的代码放到线程池
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time: %s" %(time.time()-start_time))

asyncio发送http请求

asyncio发送http请求可以通过asyncio的open_connection方法实现,open_connection方法返回reader和writer对象,分别用于读和写

import asyncio
from urllib.parse import urlparse
import time

async def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = '/'

    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
    writer.write(
        "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    all_lines = []
    async for raw_line in reader:  # __aiter__ __anext__魔法方法
        line = raw_line.decode('utf8')
        all_lines.append(line)
    html = '\n'.join(all_lines)
    return html

if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(get_url(url))
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time: %s" % (time.time() - start_time))

如果我们需要获取协程执行后的结果,我们可以把future对象放入tasks里面,然后通过future获取result

修改后半段代码

if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))   # future对象
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        print(task.result())    # 通过future对象获取结果
    print("last time: %s" % (time.time() - start_time))

上面的是所有协程都执行完后再获取结果,如果需要执行完一个马上获取结果,可以使用as_completed方法

import asyncio
from urllib.parse import urlparse
import time

async def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = '/'

    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
    writer.write(
        "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    all_lines = []
    async for raw_line in reader:  # __aiter__ __anext__魔法方法
        line = raw_line.decode('utf8')
        all_lines.append(line)
    html = '\n'.join(all_lines)
    return html

async def main():
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    for task in asyncio.as_completed(tasks):
        result = await task     # 一定要加await
        print(result)

if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("last time: %s" % (time.time() - start_time))

asyncio同步和通信

同步

协程一般是不需要锁的

import asyncio

total = 0 

async def add():
    global total
    for _ in range(1000000):
        total += 1

async def desc():
    global total, lock
    for _ in range(1000000):
        total -= 1 

if __name__ == '__main__':
    tasks = [add(), desc()]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print(total)

上面的代码是不需要加锁的,不管运行多少次,结果都为0。但在某些情况下,我们需要加锁来使协程同步

import asyncio
import aiohttp
from asyncio import Lock

cache = {"baidu": "http://ww.baidu.com"}
lock = Lock()   # 这里的Lock不是系统的Lock,还有async for 。。。类似的用法

async def get_stuff(url):
    async with lock:    # 等价于with await lock
        # 这里可以使用async with 是因为 Lock中有__await__ 和 __aenter__两个魔法方法
        # 和线程一样, 这里也可以用 await lock.acquire() 并在结束时 lock.release
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request("GET", url)
        cache[url] = stuff
        return stuff

async def parse_stuff(url):
    stuff = await get_stuff(url)
    # do some parse

async def use_stuff(url):
    stuff = await get_stuff(url)
    # use stuff to do something

if __name__ == '__main__':
    tasks = [parse_stuff("baidu"), use_stuff("baidu")]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

这里parse_stuff和use_stuff有共同调用的代码get_stuff,parse_stuff去请求的时候,如果get_stuff也去请求,可能触发网站的反爬机制

通信

因为协程是单线程的,所以协程完全可以使用全局变量实现queue来相互通信,但是如果想要在queue中定义存放的最大数目,那么需要使用asyncio的Queue,同时使用get和put时需要加上await

from asyncio import Queue
queue = Queue(maxsize=5)    

原文地址:https://www.cnblogs.com/zzliu/p/11259452.html

时间: 2024-10-13 00:29:42

asyncio并发编程的相关文章

gj13 asyncio并发编程

13.1 事件循环 asyncio 包含各种特定系统实现的模块化事件循环 传输和协议抽象 对TCP.UDP.SSL.子进程.延时调用以及其他的具体支持 模仿futures模块但适用于事件循环使用的Future类 基于 yield from 的协议和任务,可以让你用顺序的方式编写并发代码 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池 模仿threading模块中的同步原语.可以用在单线程内的协程之间 import asyncio import time # 不再这使用同步阻

asyncio 并发编程(二)

Future 对象 future 表示还没有完成的工作结果.事件循环可以通过监视一个future 对象的状态来指示它已经完成.future 对象有几个状态: Pending:循环 Running:运行 Done:完成 Cancelled:取消 获取 Future 中的结果 创建future的时候,task为pending,事件循环调用执行的时候是running,调用完毕是done,如果需要停止事件循环,就需要先把task取消,状态为cancel. import asyncio def callb

Python高级编程和异步IO并发编程

Python高级编程和异步IO并发编程网盘地址:https://pan.baidu.com/s/1eB-BsUacBRhKxh7qXwndMQ 密码: tgba备用地址(腾讯微云):https://share.weiyun.com/5Z3x9V0 密码:7cdnb2 针对Python高级编程和异步IO并发编程,把每个Python高级知识点从起因到原理讲透的课程全网难寻 第1章 课程简介第2章 python中一切皆对象第3章 魔法函数第4章 深入类和对象第5章 自定义序列类第6章 深入python

Python并发编程之学习异步IO框架:asyncio 中篇(十)

大家好,并发编程 进入第十章.好了,今天的内容其实还挺多的,我准备了三天,到今天才整理完毕.希望大家看完,有所收获的,能给小明一个赞.这就是对小明最大的鼓励了.为了更好地衔接这一节,我们先来回顾一下上一节的内容. 上一节「」,我们首先介绍了,如何创建一个协程对象.主要有两种方法 通过async关键字, 通过@asyncio.coroutine 装饰函数. 然后有了协程对象,就需要一个事件循环容器来运行我们的协程.其主要的步骤有如下几点: 将协程对象转为task任务对象 定义一个事件循环对象容器用

[记录]Python高并发编程

========== ==多进程== ========== 要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识. Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊.普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回. 子进程永远返回0,而父进程返回子进程的ID.这样做的理由是,一个父进程可以fork出很多子进程,

python 闯关之路四(下)(并发编程与数据库编程) 并发编程重点

并发编程重点: 1 2 3 4 5 6 7 并发编程:线程.进程.队列.IO多路模型 操作系统工作原理介绍.线程.进程演化史.特点.区别.互斥锁.信号. 事件.join.GIL.进程间通信.管道.队列. 生产者消息者模型.异步模型.IO多路复用模型.select\poll\epoll 高性 能IO模型源码实例解析.高并发FTP server开发 1.请写一个包含10个线程的程序,主线程必须等待每一个子线程执行完成之后才结束执行,每一个子线程执行的时候都需要打印当前线程名.当前活跃线程数量: 1

asyncio--python3未来并发编程主流、充满野心的模块

介绍 asyncio是Python在3.5中正式引入的标准库,这是Python未来的并发编程的主流,非常重要的一个模块.有一个web框架叫sanic,就是基于asyncio,语法和flask类似,使用sanic可以达到匹配go语言的并发量,但无奈第三方组件太少. asyncio模块提供了使用协程构建并发应用的工具.threading模块通过应用线程实现并发,multiprocessing使用系统进程实现并发,asyncio使用一种单线程.单进程模式实现并发,应用的各个部分会彼此合作,在最优的时刻

Java并发编程:Concurrent锁机制解析

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

VS C++ 并发编程

1.VS2012及以上版本,支持C++11 thread类的并发编程. 相关材料可以参考博客:http://www.cnblogs.com/rangozhang/p/4468754.html 2.但对其之前的版本,可采用以下方式,实现类成员函数创建子线程实现并发. 首先需实现线程类的run函数,故定义了线程类的头文件和其对应的函数实现,具体如图1,2所示: 图1 线程类的头文件 图2 线程类的实现文件 注意到继承的DerivedThread类,只需将并发执行的函数写在其对应的run()函数内即可