协程、异步IO

协程,又称微线程,纤程。英文名Coroutine,协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
    •   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

from greenlet import greenlet

def t1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def t2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(t1)
gr2 = greenlet(t2)
gr1.switch()

吃包子

from greenlet import greenlet

def consumer(n):
    print("--->starting eating baozi...")
    while True:
        print("eating baozi %s" % n)
        n = gr2.switch()

def producer():
    n = 0
    while n < 5:
        n += 1
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
        gr1.switch(n)

if __name__ == ‘__main__‘:
    gr1 = greenlet(consumer)
    gr2 = greenlet(producer)
    gr2.switch()

Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

from gevent import monkey
monkey.patch_all()
import gevent
from urllib.request import urlopen

# urllib.request gevent默认是不支持的,因为不认识这里面的IO操作,就不能切换,如果想让gevent支持,需要打一个补丁
# from gevent import monkey; monkey.patch_all()注意1点,一定要在模块导入之前加载这个补丁
def pa_web_page(url):
    print("GET url", url)
    req = urlopen(url)
    data = req.read()
    print(data)
    print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([
    gevent.spawn(pa_web_page, "https://www.xxx1.org"),
    gevent.spawn(pa_web_page, "http://www.xxx.com.cn")
])

IO多路复用

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:

1. 保存处理机上下文,包括程序计数器和其他寄存器。

2. 更新PCB信息。

3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。

4. 选择另一个进程执行,并更新其PCB。

5. 更新内存管理的数据结构。

6. 恢复处理机上下文。

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

I/O模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝

到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

异步 I/O(asynchronous IO)

Linux下的asynchronous IO其实用得很少。先看一下它的流程:

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

I/O多路复用,sellect、poll、epoll三者的区别

select 
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll没有监视的文件描述符的限制

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

select例子,server端:

import select
import socket
import queue
import time

server = socket.socket()
server.bind((‘localhost‘, 8888))
server.listen(5)
server.setblocking(False)  # False or 0 代表非阻塞

inputs = [server]
msg_queues = {}
outputs = []

while True:
    # select()方法接收并监控3个通信列表
    # rlist:让select帮你监测,
    # wlist:所有需要返回的放到这里
    # xlist:监测哪些socket有没有出问题
    r_list, w_list, exception_list = select.select(inputs, outputs, [])  # r_list里面只要有返回,肯定有数据
    # print(‘r_list‘, r_list)
    # print(‘w_list‘, w_list)
    # print(‘e_list‘, exception_list)
    for s in r_list:  # s相当于server的对象
        if s is server:  # 代表new conn进来了
            conn, addr = s.accept()
            print(‘got new conn‘, conn, addr)
            # conn.receive(1024)  # 这里不能这么做,当没有数据过来的时候会报错
            inputs.append(conn)  # 思路是需要委托select来进行监听
            msg_queues[conn] = queue.Queue()  # conn为key,初始化一个队列
        else:
            try:
                data = s.recv(1024)
                print(‘recv data from [%s]:[%s]:‘ % (s.getpeername(), data.decode()))
                msg_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            except ConnectionResetError as e:
                print("conn closed")
                inputs.remove(s)
                if s in outputs:
                    outputs.remove(s)
                del msg_queues[s]

    for s in w_list:
        try:
            data = msg_queues[s].get_nowait()
            s.send(data.upper())
        except queue.Empty as e:
            outputs.remove(s)

client

import socket

HOST = ‘localhost‘  # The remote host
PORT = 8888  # The same port as used by the server
s = socket.socket()
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)
    print(‘Received‘, repr(data))
s.close()
时间: 2024-10-10 07:54:54

协程、异步IO的相关文章

python 协程, 异步IO Select 和 selectors 模块 多并发演示

主要内容 Gevent协程 Select\Poll\Epoll异步IO与事件驱动 selectors 模块 多并发演示 协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此: 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开

Python 协程/异步IO/Select\Poll\Epoll异步IO与事件驱动

1 Gevent 协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此: 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 协程的好处: 无需线程上下文切换的开销 无需原子操作锁定及同步的开销 方便切换控

Python全栈开发-Day10-进程/协程/异步IO/IO多路复用

本节内容 多进程multiprocessing 进程间的通讯 协程 论事件驱动与异步IO Select\Poll\Epoll--IO多路复用   1.多进程multiprocessing Python的线程用的是操作系统的原生线程,同样python的进程用的是操作系统的原生进程. 多进程之间没有锁的概念,多进程之间数据不能互相访问,所以不存在互斥锁.GIL问题又是仅仅出现在多线程中. 所以如果我们启动8个进程,每个进程有一个主线程,即8个线程,分别运行在8个CPU上,就可以充分利用多核的优势了.

python之协程与IO操作

协程 协程,又称微线程,纤程.英文名Coroutine. 协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用. 子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕. 所以子程序调用是通过栈实现的,一个线程就是执行一个子程序. 子程序调用总是一个入口,一次返回,调用顺序是明确的.而协程的调用和子程序不同. 协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,

10-线程,进程,协程,IO多路复用

- 线程进程介绍 1. 工作最小单元是线程 2. 应用程序 -> 至少有一个进程 -> 至少有一个线程 3. 应用场景: IO密集型:线程 计算密集型:进程 4. GIL,全局解释器锁. - 保证同一个进程中只有一个线程同时被调度- 线程 1. 基本使用 def task(arg): time.sleep(arg) print(arg) for i in range(5): t = threading.Thread(target=task,args=[i,]) # t.setDaemon(Tr

线程,协程,IO模型

理论: 1.每创造一个进程,默认里面就有一个线程 2.进程是一个资源单位,而进程里面的线程才是CPU上的一个调度单位 3.一个进程里面的多个线程,是共享这个进程里面的资源的 4.线程创建的开销比进程要小,不用申请空间,只要基于现在的进程所在的空间,开一条流水线 就可以,所以创造线程的速度比创造进程的速度快 5.进程之间更多的是竞争关系,因为他们彼此之间是互相隔离的,而同一个进程的线程之间是合作关系 线程与进程的区别 1.同一进程内的线程共享创建它们的进程的地址空间,也就是同一进程内的多个线程共享

网络编程进阶:并发编程之协程、IO模型

协程: 基于单线程实现并发,即只用一个主线程(此时可利用的CPU只有一个)情况下实现并发: 并发的本质:切换+保存状态 CPU正在运行一个任务,会在两种情况下切走去执行其他任务(切换有操作系统强制控制),一种情况是该任务发生了阻塞,另一种是该任务计算的时间过长或有一个优先级更高的程序替代了它 在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态 如果多个任务都是纯计算的,上图的情况2并不能提升效率,因为只是让CPU来回切,这样看起来所有任务都被"同时

协程、IO模型

一.协程 1.定义: 单线程实现并发,可以再应用程序当中控制多个任务的切换+保存状态. 优点:在应用程序级别的速度要远远高于操作系统的切换 缺点:多个任务一旦有一个任务阻塞住了,没有及时切换,整个线程都将阻塞在原地,该线程内的其他任务都不能继续执行了. 所以,在引入协程之后,就需要检测单线程下所有的IO行为,必须实现一旦遇到IO就立即切换,少一个都不行,因为一旦遇到一个任务阻塞住了,其它的任务都将阻塞住,及时其余的线程都是可以计算的,它们也是无法继续执行了. 2.协程的目的 程序想要能够在单线程

Cpython的进程,线程,协程,io模型

一.进程: 一  基础概念: 1.进程的定义: 进程本身是一个抽象的概念,进程是操作系统资源分配的基本单位,是正在进行的一个过程或者说一个任务,负责执行任务的是cpu. 2.并行与并发: 假设计算机只有一个cpu,由于一个cpu在同一时间只能执行一个任务,那么当有多个任务想同时运行按道理那么需要多个cpu执行 这个时候如果有多个cpu同时运行这些任务就是并行,如果是一个cpu运行多个任务(其实就是cpu在多个任务之间来回切换则为并发) 那么由此定义如下: 并行:多个任务同时运行,只有具备多个cp

5,线程池,进程池,协程,IO模型

今日内容: 1,线程池 2,进程池 3,协程 4,IO 模型 服务端要满足这三个条件: 1,24小时不间断的提供服务 2,能够支持高并发 3,要有固定的IP地址和端口在服务端这个地方会出现阻塞态情况: 阻塞IO 操作有: 1,链接循环 2,通信循环单线程实现高并发思路: 为了更好的提高程序的运行效率,即实现高并发,让服务端同时能够接受多个客户端的消息 所以一般在服务端会把,连接循环和通信循环封装为两个不同的函数方法, 这样当一个客户端与服务端进行通信时,服务端的连接循环可以和其他客户端进行连接,