导读:作为日常生产开发中非常实用的一门语言,python广泛应用于网络爬虫、web开发、自动化测试、数据分析和人工智能等领域。但python是单线程的,想要提升python的处理速度,涉及到一个很关键的技术——协程。本篇文章,将讲述python协程的理解与使用。
1、操作系统相关概念
在理解与使用协程之前,先简单的了解几个与操作系统相关的概念,包括进程、线程、同步和异步、阻塞与非阻塞。了解这些概念,对你学习协程、消息队列、缓存等知识都有一定的帮助。
(1)进程:
进程是操作系统分配资源的最小单位,系统由一个个程序(进程)组成的,一般而言,分为文本区域、数据区域和堆栈区域
文本区域存储处理器执行的代码(机器码),通常来说,这是一个只读区域,防止运行的程序被意外的修改
数据区域存储所有的变量和动态分配的内存,又细分为初始化的数据区(所有初始化的全局、静态、常量以及外部变量)和未初始化的数据区(初始化未0的全局变量和静态变量),初始化的变量最初保存在文本区,程序启动后被拷贝到初始化的数据区
堆栈区域存储着活动过程调用的指令和本地变量,在地址空间里,栈区紧连着堆区,他们的增长方向相反,内存是线性的,所以我们的代码放在低地址的地方,由低向高增长,栈区大小不可预测,随开随用,因此放在高地址的地方,由高向低增长。当堆与栈指针重合的时候,意味着内存耗尽,造成内存溢出。
进程的创建和销毁都非常的消耗系统资源,是一种比较昂贵的操作。进程为了自身能够得到运行,必须抢占式的争夺CPU。对于单核CPU而言,在同一时间内只能执行一个进程的代码,所以在单核CPU上实现多进程,是通过CPU的快速切换不同进程来实现的,看上去就像是多个进程同时执行。
由于进程间是隔离的,各自拥有自己的内存资源,相比于线程的共享内存而言,要更安全,不同进程之间的数据只能通过IPC(Inter-Process Communication)进行通信共享
(2)线程
线程是CPU调度的基本单位。如果进程是一个容器,线程就是运行在容器里面的程序,线程是属于进程的,同个进程的多个线程共享进程的内存地址空间
线程间可以直接通过全局变量进行通信,所以相对来说,线程间通信是不太安全的,因此引入各种锁的场景,这里将不阐述
当一个线程奔溃了,会导致整个进程也奔溃,即其它线程也挂了。这一点与进程不一样,一个进程挂了,其他进程照样执行
在多核操作系统中,默认一个进程内只有一个线程,所以对多进程处理就像是一个进程一个核心
(3)同步和异步
同步和异步关注的是消息通信机制,所谓同步,就是在发出一个函数调用时,在没有得到结果之前,该调用不会返回。一旦调用返回,就立即得到调用的返回值,即调用者主动等待调用结果
所谓异步,就是在请求发出去后,这个调用就立即返回,但没有返回结果,通过回调的方式告知该调用的实际结果
同步的请求,需要主动读写数据,并且等待结果;异步的请求,调用者不会立即得到结果。而是在调用发出后,被调用者通过状态、通知来告诉调用者,或通过回调函数处理这个调用
(4)阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回
非阻塞调用指在得到不能立即得到结果之前,该调用不会阻塞当前线程。所以,区分的条件在于,进程/线程要访问的数据是否就绪,进程/线程是否需要等待
非阻塞一般通过多路复用实现,多路复用由select、poll、epoll几种实现方式
(5)协程
了解完前面几个概念,再来看看协程的概念
协程是属于线程的,又称微线程,纤程,英文名是coroutine。举个例子,在执行函数A时,我希望能随时终端去执行函数B,然后终端B的执行,切换回来执行函数A。这就是协程的作用,由调用者自有切换。这个切换过程并不等同于函数调用,因为它没有调用语句。执行方式与多线程类似,但是协程只有一个线程执行
协程的优点是执行效率非常高,因为协程的切换是由程序自身控制,不需要切换线程,即没有切换线程的开销。同时,由于只有一个线程,不存在冲突的问题,不需要依赖锁(加锁和释放锁需要很多资源消耗)
协程的主要使用场景在于处理io密集型程序,解决效率问题,不同于CPU密集型程序的处理。然而实际开发中这两种场景非常多,如果要充分发挥CPU的利用率,可以使用多进程+协程的方式,本文后续将讲到结合点
2、协程相关原理
根据wikipedia的定义,协程是一个无优先级的子程序调度组件,允许子程序在特定的地方挂起恢复。所以理论上,只要内存足够,一个线程可以有任意多个协程,但同一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源。协程是为了充分发挥异步调用的优势,异步操作则是为了IO操作阻塞线程
(1)知识准备
在了解原理前,先做一个知识的准备
1)现代主流的操作系统几乎都是分时操作系统,即一台计算机采用时间片轮转的方式为多个用户提供服务,系统资源分配的基本单位是进程,CPU调度的基本单位是线程
2)运行时内存空间氛围变量区、栈区、堆区。内存地址分配上,堆区从低到高,栈区从高到低
3)计算机执行时一条条指令读取执行,执行到当前指令时,下一条指令的指令的地址在指令寄存器的IP中,ESP寄存值只想当前栈顶地址,EBP指向当前活动栈帧的基地址
4)系统发生函数调用时操作为:先将入参从右往左一次压栈,然后把返回地址压栈,最后将当前EBP寄存器的值压栈,修改ESP寄存器的值,在栈区分配当前函数局部变量所需的空间
5)协程的上下文包含属于当前协程的栈区和寄存器里面存放的值
(2)事件循环
在python3.3中通过yield from使用协程,在3.5中,引入了关于协程的语法糖async/await的原理解析。其中,事件循环是一个核心所在,编写过js的同学,会对事件循环Eventloop更加了解,事件循环是一种等待程序分配消息或事件的编程架构。在python中,asyncio.coroutine修饰器用来标记作为协程的函数,这里的协程是和asyncio及其事件循环一起使用的,而在后续的发展中,async/await被使用的越来越广泛
(3)async/await
async/await是使用python协程的关键,从结构上来看,asyncio实质上是一个异步框架,async/await是为异步框架提供API以方便使用者调用,所以使用者要想使用async/await编写协程代码,目前必须基于asyncio或其他异步库
(4)Future
在实际开发编写异步代码时,为了避免太多回调方法导致的回调地狱,但又需要获取异步调用的返回结果,聪明的语言设计者设计了一个叫做Future的对象,封装了与loop的交互行为。其大致执行过程为:程序启动后,通过add_done_callback方法向epoll注册回调函数,当result属性得到返回值后,主动运行之前注册的回调函数,向上传递给coroutine。这个Future对象为asyncio.Future
但是,要想取得返回值,程序必须恢复到工作状态,而由于Future对象本身的生存周期比较短,每一次注册回调、产生事件、触发回调过程后工作可能已经完成,所以用Future向生成器send result并不合适。这里又引入一个新的对象Task,保存在Future对象中,对生成器协程进行状态管理
Python里另一个Future对象是concurrent.futures.Future,与asyncio.Future互不兼容,容易产生混淆。区别点在于,concurrent.futures是线程级的Future对象,当使用concurrent.futures.Executor进行多线程编程时,该对象用于在不同的thread之间传递结果
(5)Task
上文中提到,Task是维护生成器协程状态处理执行逻辑的任务对象,Task中有一个_step方法,负责生成器协程与EventLoop交互过程的状态迁移,整个过程可以理解为:Task向协程send一个值,恢复其工作状态。当协程运行到断点后,得到新的Future对象,再处理future与loop的回调注册过程
(6)Loop
在日常开发中,会有一个误区,认为每一个线程都可以有一个独立的loop。实际运行时,主线程才能通过asyncio.get_event_loop()创建一个新的loop,而在其他线程时,使用get_event_loop()却会抛错。正确的做法为通过asyncio.set_event_loop(),将当前线程与主线程loop显式绑定
3、协程实战
上面介绍完了协程相关的概念和原理,接下来看看如何使用,这里举一个实际场景的例子
场景:
外部接受一些文件,每个文件里有一些数据,其中,这组数据需要通过http的方式,发向第三方平台,并获得结果
分析:
由于同一文件的每一组数据没有前后的处理逻辑,在之前通过requests库发送的网络请求,串行执行,下一组数据的发送需要等待上一组数据的返回,显得整个文件的处理时间长,这种请求方式,完全可以由协程来实现
为了更方便的配合协程发请求,我们使用aiohttp库来代替requests库,关于aiohttp,下面做简单介绍
aiohttp:
aiohttp是asyncio和python的异步HTTP客户端/服务器,由于是异步的,经常用在服务器端接收请求,和客户端爬虫应用,发起异步请求,这里我们主要用来发请求
aiohttp支持客户端和HTTP服务器,可以实现单线程并发IO操作,无需使用Callback Hell即可支持Server WebSockets和Client WebSockets,且具有中间件
4、代码实现
直接上代码吧,talk is cheap,show me the code~
import aiohttp import asyncio from inspect import isfunction import time import logger @logging_utils.exception(logger) def request(pool, data_list): loop = asyncio.get_event_loop() loop.run_until_complete(exec(pool, data_list)) async def exec(pool, data_list): tasks = [] sem = asyncio.Semaphore(pool) for item in data_list: tasks.append( control_sem(sem, item.get("method", "GET"), item.get("url"), item.get("data"), item.get("headers"), item.get("callback"))) await asyncio.wait(tasks) async def control_sem(sem, method, url, data, headers, callback): async with sem: count = 0 flag = False while not flag and count < 4: flag = await fetch(method, url, data, headers, callback) count = count + 1 print("flag:{},count:{}".format(flag, count)) if count == 4 and not flag: raise Exception(‘EAS service not responding after 4 times of retry.‘) async def fetch(method, url, data, headers, callback): async with aiohttp.request(method, url=url, data=data, headers=headers) as resp: try: json = await resp.read() print(json) if resp.status != 200: return False if isfunction(callback): callback(json) return True except Exception as e: print(e)
这里,我们封装了对外发送批量请求的request方法,接收一次性发送的数据多少,和数据综合,在外部使用时,只需要构建好网络请求对象的数据,设定好请求池大小即可,同时,设置了重试功能,进行了4次重试,防治在网络抖动的时候,单个数据的网络请求发送失败
最终效果:
在使用协程重构网络请求模块之后,当数据量在1000的时候,由之前的816s,提升到424s,快了一倍,且请求池大小加大的时候,效果更明显,由于第三方平台同时建立连接的数据限制,我们设定了40的阈值。可以看到,优化的程度很显著
文章来源:阿里技术(博主为了学习这篇文章,将阿里技术中的原文重新码了一遍,放到了自己的博客中,与原文相比,语句的表达方式有些出入,但内容是一致的)
原文地址:https://www.cnblogs.com/xmcwm/p/12082982.html