39 - 同步-异步-IO多路复用

目录

  • 1 同步与异步
  • 2 阻塞与非阻塞
  • 3 什么是IO
    • 3.1 内核态用户态
    • 3.2 IO两个阶段
    • 3.3 IO模型
      • 3.3.1 同步阻塞IO
      • 3.3.2 同步非阻塞IO
      • 3.3.3 IO多路复用
      • 3.3.4 异步IO
  • 4 Python中的IO多路复用
    • 4.1 selectors库
    • 4.2 register方法
    • 4.3 利用selectors完成IO多路复用版本的EchoServer
    • 4.4 聊天室

1 同步与异步

同步和异步关注的是程序在执行时的状态:

  • 同步,可以理解为在执行完一个函数或方法之后,一直等待系统返回值或消息,这时程序是出于阻塞的,只有接收到返回的值或消息后才往下执行其他的命令。
  • 异步,执行完函数或方法后,不必阻塞性地等待返回值或消息,只需要向系统委托一个异步过程,那么当系统接收到返回值或消息时,系统会自动触发委托的异步过程,从而完成一个完整的流程。

    例如:

  • 同步如打电话,通信双方不能断(我们是同时进行,同步),你一句我一句,这样的好处是,对方想表达的信息我马上能收到,但是,我在打着电话,我无法做别的事情。
  • 异步如收发收短信,对比打电话,打电话我一定要在电话的旁边听着,保证双方都在线,而收发短信,对方不用保证此刻我一定在手机旁,同时,我也不用时刻留意手机有没有来短信。这样的话,我看着视频,然后来了短信,我就处理短信(也可以不处理),接着再看视频。

????????对于写程序,同步往往会阻塞,没有数据过来,我就等着,异步则不会阻塞,没数据来我干别的事,有数据来去处理这些数据。一句话总结一下就是:函数或方法被调用时,调用者是否得到最终结果的

  • 直接得到最终结果的就是同步调用;
  • 得到中间结果而非最终结果的,就是异步调用

2 阻塞与非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

一句话总结一下就是:函数或方法被调用时,是否立刻返回

  • 立即返回就是非阻塞调用
  • 不立即返回就是阻塞调用

3 什么是IO

linux系统中,所有的设备读写都可以看做文件的读写来操作,对文件的读写一般要经过内核态和用户态的切换,正因为有切换才导致了IO有同步和异步的说法。

3.1 内核态用户态

在i386之前,CPU工作在实模式下,之后开始支持保护模式,通常用保护环(ring)来描述特权级,分为四个运行级别:Ring0 ~ Ring3.

实模下,软件可以直接访问BIOS例程以及周边硬件,没有任何硬件等级的存储器保护观念或多任务。

CPU 在某个时刻运行在特定的特权级,等级约束了CPU了可以做什么,不可以做什么。x86(现在最流行的PC/Server CPU架构) CPU 只用了两个特权级:0 和 3:

  • Ring 0: 可以执行特权指令,可以访问说有级别数据,可以访问IO设备等(级别最高)
  • Ring 3:只能访问本级别数据(级别最低)

针对于Linux来说,

  • ring0 就是内核态,运行内核代码
  • ring3 就是用户态,运行用户代码

当用户的应用程序想访问某些硬件资源时,就需要通过操作系统提供的 系统调用 ,系统调用可以使用特权指令运行在内核空间,此时进程陷入内核态运行。系统调用完成,进程将回到用户态继续执行用户空间代码。

现代操作系统采用虚拟存储器,对于32位操作系统来说,进程对虚拟内存地址的内存寻址空间为4G(2^32)。操作系统中,内核程序独立且运行在较高的特权级别上,它们驻留在被保护的内存空间上,拥有访问硬件设备的权限,这部分内存称为内核空间(内核态,最高1G)。

3.2 IO两个阶段

通常来讲IO可以分成两种类型:

  • 来自网络的IO
  • 来自文件或者设备的IO

IO过程可以分为两个阶段:

  • 数据准备阶段(内核从IO设备读写数据)
  • 内核空间复制回用户空间进程缓冲区阶段(进程从内核复制数据)

3.3 IO模型

主要分为同步IO和异步IO,而同步IO又可以分为:同步阻塞IO、同步非阻塞IO、IO多路复用。

3.3.1 同步阻塞IO

进程等待(阻塞),直到读写完成。(全程等待)

3.3.2 同步非阻塞IO

进程调用read操作,如果IO设备没准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间

  1. 第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。
  2. 第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。

3.3.3 IO多路复用

所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了立即开始处理,提高了同时处理IO的能力。主要的IO多路复用有:

  • select: 几乎所有操作系统平台都支持,poll是对select的升级
  • epoll:Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。(BSD、Mac平台有kqueue,Windows有iocp)

例如:

  • select:食堂供应很多菜(众多IO),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等。你只好等待大师傅叫,其中一样菜好了,大师傅叫你,说你点的菜有的好了,你得自己遍历找找看哪一样好了。请服务员打给你。
  • epool:是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。

一般情况下,select最多能监听1024个fd(可以修改,但不建议),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。epoll没有管理的fd的上限,且是回调机制,不需要遍历,效果很高。

3.3.4 异步IO

进程发起异步IO请求,立即返回。内核完成IO的两个阶段,后给进程发信号

Linux的aio的系统调用,内核从2.6版本开始支持。

4 Python中的IO多路复用

Python的select库实现了select、poll系统调用,这个基本上操作系统都支持,部分实现了epoll,它是底层的IO多路复用模块。

  • select维护一个文件描述符数据结构,单个进程使用于上限,通常是1024,线性扫描这个数据结构,效率低。
  • pool和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才知道哪个设备就绪了。
  • epoll使用事件通知机制,使用回调机制提高效率

    select/pool 还有从内核空间复制消息到用户空间,而epoll通过内核空间和用户空间共享一块内存来减少复制。

4.1 selectors库

Python 3.4 提供 selectors库,高级IO复用库。它的类层次结构如下

BaseSelector
+-- SelectSelector
+-- PollSelector
+-- EpollSelector
+-- DevpollSelector
+-- KqueueSelector

观察模块原码倒数几行我们知道,selecors.DefaultSelector会返回当前平台最有效、性能最高的实现,但是由于没有实现 Windows下的IOCP,所以windows下只能退化为select。

4.2 register方法

selectorsobj.register为当前selectors实例注册一个文件对象,监视它的IO实现,返回一个selectKey对象。它的参数如下:

register(self, fileobj, events, data=None):
  • fileobj: 被监视的文件对象,比如socket对象
  • events: 事件,该文件对象必须等待的事件
  • data:可选的与此文件对象关联的不透明的属性,例如:关联用来存储每个客户端的会话ID,关联方法。通过这个参数在关注的事件产生后让selector干什么事。

常用的Event事件

  • EVENT_READ: 可读0b01,内核已经准备好输入输出设备,可以开始读了。
  • EVENT_WRITE: 可写0b10,内核准备好了,可以往里写了。

返回的selectKey对象对象具有以下方法:

  • fileobj: 注册的文件对象(socket)
  • fd:文件描述符
  • events:等待fd标识的文件对象触发的事件类型
  • data:注册时关联的数据(标识回调函数)

4.3 利用selectors完成IO多路复用版本的EchoServer

下面是代码:

import socket
import selectors
import logging
import threading
import time

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class EchoServer:

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.evnet = threading.Event()

    def start(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen()
        self.sock.setblocking(False)   # 事件被触发,说明有链接进来,那么不需要阻塞等待
        self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept)  # 注册accept函数
        threading.Thread(target=self.select, name='select', daemon=True).start()   # 启动监视进程

    # 启动selector,用于监视事件的发生
    def select(self):
        while not self.evnet.is_set():
            events = self.selector.select()
            for key, event in events:
                key.data(key.fileobj)

    def accept(self, sock: socket.socket):
        sock, client = sock.accept()
        # 将client的读写也加入到监事列表中
        self.selector.register(sock, selectors.EVENT_READ, self.recv)

    # 当对应的socket有写操作,会直接直接触发执行,所以这里根本不需要死循环
    def recv(self, sock: socket.socket):
        client_ip = sock.getpeername()
        data = sock.recv(1024)
        if data == b'quit' or data == b'':
            self.evnet.set()
            sock.close()
            return
        msg = '{}:{} {}'.format(*client_ip, data.decode()).encode()
        sock.send(msg)

    # 关闭进程时,将selectors中注册的事件取消掉,再关闭监视器
    def stop(self):
        self.evnet.set()
        event_list = set()
        for event in self.selector.get_map():
            event_list.add(event)
        for event in event_list:
            self.selector.unregister(event)
        self.selector.close()

if __name__ == '__main__':
    es = EchoServer('127.0.0.1', 9999)
    es.start()

    while True:
        cmd = input('>>>:').strip()
        if cmd == 'quit':
            es.stop()
            break

4.4 聊天室

这里是每次client进来时,通过记录client的socket来完成的。

import socket
import selectors
import logging
import threading

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class ChatSocketServer:

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()
        self.clients = {}   # 用于记录连接的client

    def start(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen()
        self.sock.setblocking(False)  # 事件被触发,说明有链接进来,那么不需要阻塞等待
        self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept)  # 注册accept函数
        threading.Thread(target=self.select, name='select', daemon=True).start()  # 启动监视进程

    # 启动selector,用于监视事件的发生
    def select(self):
        while not self.event.is_set():
            events = self.selector.select()
            for key, event in events:
                key.data(key.fileobj)

    def accept(self, sock: socket.socket):
        sock, client = sock.accept()

        # 添加已连接客户端列表
        self.clients[client] = sock

        # 将client的读写也加入到监事列表中
        self.selector.register(sock, selectors.EVENT_READ, self.recv)

    # 当对应的socket有写操作,会直接直接触发执行,所以这里根本不需要死循环
    def recv(self, sock: socket.socket):
        client_ip = sock.getpeername()
        data = sock.recv(1024)
        if data == b'quit' or data == b'':
            self.clients.pop(client_ip)   # 退出后弹出client地址
            self.event.set()
            sock.close()
            return
        msg = '{}:{} {}'.format(*client_ip, data.decode())
        logging.info(msg)
        for clients in self.clients.values():
            clients.send(msg.encode())

    # 关闭进程时,将selectors中注册的事件取消掉,再关闭监视器
    def stop(self):
        self.event.set()
        event_list = set()
        for event in self.selector.get_map():
            event_list.add(event)
        for event in event_list:
            self.selector.unregister(event)
        for client in self.clients.values():    # 关闭所以已连接的client的socket
            client.close()
        self.selector.close()

if __name__ == '__main__':
    es = ChatSocketServer('127.0.0.1', 9999)
    es.start()

    while True:
        cmd = input('>>>:').strip()
        if cmd == 'quit':
            es.stop()
            break

当然还可以通过selector来处理,为什么呢?因为每当有请求进来,selector都会监视当前连接的recv,那么我们只需要在selector中的recv拿出来,就知道到底有多少连接了。

import socket
import selectors
import logging
import threading

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class ChatSocketServer:

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()

    def start(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen()
        self.sock.setblocking(False)  # 事件被触发,说明有链接进来,那么不需要阻塞等待
        self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept)  # 注册accept函数
        threading.Thread(target=self.select, name='select', daemon=True).start()  # 启动监视进程

    # 启动selector,用于监视事件的发生
    def select(self):
        while not self.event.is_set():
            events = self.selector.select()
            for key, event in events:
                key.data(key.fileobj)

    def accept(self, sock: socket.socket):
        sock, client = sock.accept()

        # 将client的读写也加入到监事列表中
        self.selector.register(sock, selectors.EVENT_READ, self.recv)

    # 当对应的socket有写操作,会直接直接触发执行,所以这里根本不需要死循环
    def recv(self, sock: socket.socket):
        client_ip = sock.getpeername()
        data = sock.recv(1024)
        if data == b'quit' or data == b'':
            self.selector.unregister(sock)   # 客户端退出,则取消监控当前 socket 事件
            sock.close()
            return
        msg = '{}:{} {}'.format(*client_ip, data.decode())
        logging.info(msg)

        # 群发消息,如果data绑定的是recv(排除accept),那么就通过socket群发消息
        for sock in self.selector.get_map().values():
            if sock.data == self.recv:
                sock.fileobj.send(msg.encode())

    # 关闭进程时,将selectors中注册的事件取消掉,再关闭监视器
    def stop(self):
        self.event.set()
        event_list = set()
        for event in self.selector.get_map():
            event_list.add(event)
        for event in event_list:
            self.selector.unregister(event)
        self.selector.close()

if __name__ == '__main__':
    es = ChatSocketServer('127.0.0.1', 9999)
    es.start()

    while True:
        cmd = input('>>>:').strip()
        if cmd == 'quit':
            es.stop()
            break

原文地址:https://www.cnblogs.com/dachenzi/p/10512810.html

时间: 2024-10-11 06:39:21

39 - 同步-异步-IO多路复用的相关文章

基础入门_Python-网络编程.分分钟掌握阻塞/非阻塞/同步/异步IO模型?

概念梳理: 故事独白: 满满爱喝茶,废话不多说,开始煮开水. 出场人物: 满满, 普通水壶, 高级水壶(水开会响) 1. 满满把水壶放在火上, 站在那里等水开(同步阻塞) 满满觉得自己有点儿傻逼~ 2. 满满把水壶放在火上,去客厅看电视,时不时的去厨房瞅瞅水开木有(同步非阻塞) 满满觉得自己还是有点傻~,于是买了个高级水壶, 水开后会响~ 3. 满满把高级水壶放在火上, 站在那里等水开(异步阻塞) 满满想高级水壶水开会自己叫~为毛不去看个电视哪? 4. 满满把高级水壶放在火上, 去客厅看电视,

python开发IO模型:阻塞&非阻塞&异步IO&多路复用&selectors

一 IO模型介绍 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西.这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同.所以,

Java 中阻塞非阻塞io以及同步异步IO

然后借鉴下<Unix网络编程卷>中的理论: IO操作中涉及的2个主要对象为程序进程.系统内核.以读操作为例,当一个IO读操作发生时,通常经历两个步骤: 1,等待数据准备 2,将数据从系统内核拷贝到操作进程中 例如,在socket上的读操作,步骤1会等到网络数据包到达,到达后会拷贝到系统内核的缓冲区:步骤2会将数据包从内核缓冲区拷贝到程序进程的缓冲区中. 阻塞(blocking)与非阻塞(non-blocking)IO IO的阻塞.非阻塞主要表现在一个IO操作过程中,如果有些操作很慢,比如读操作

理解同步,异步,阻塞,非阻塞,多路复用,事件驱动IO

以下是IO的一个基本过程 先理解一下用户空间和内核空间,系统为了保护内核数据,会将寻址空间分为用户空间和内核空间,32位机器为例,高1G字节作为内核空间,低3G字节作为用户空间.当用户程序读取数据的时候,会经历两个过程:磁盘到内核空间(这块消耗性能,下面简称内核数据准备),内核空间拷贝到用户空间(下面简称用户空间拷贝). 基于这个前提,同步异步IO,阻塞非阻塞IO 这几个概念其实非常类似的,区分的关键点在于被调用者的返回方式. 当我们进行IO操作的时候,如果被调用者将任务全部执行完返回,称为同步

自定义异步IO框架

异步就是回调 异步 = 非阻塞+循环 select只能完成IO多路复用,不能完成异步 IO多路复用--->监听多个socket对象,这个过程是同步的 利用其特性可以开发异步模块 异步IO:非阻塞的socket + IO多路复用 自定义异步框架 import socket import select class HttpRequest(object): def __init__(self, sk, host, callback): self.socket = sk self.host = host

Python之并发、并行、阻塞、非租塞、同步、异步、IO多路复用

一.并发并行 并发:表示执行多个任务的能力 并行:表示同一时刻执行多个任务 二.模拟socket发送http请求 三大步骤:创建连接 要发送的东西 然后数据回来接收    socket默认情况下阻塞 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import socket 5 6 client = socket.socket() 7 client.setblocking(False) # 这里设置非阻塞 8 # 百度创建连接:阻塞 9

IO多路复用,同步,异步,阻塞和非阻塞 区别(转)

转自:http://www.cnblogs.com/aspirant/p/6877350.html?utm_source=itdadao&utm_medium=referral 一.什么是socket?什么是I/O操作? 我们都知道unix(like)世界里,一切皆文件,而文件是什么呢?文件就是一串二进制流而已,不管socket,还是FIFO.管道.终端,对我们来说,一切都是文件,一切都是流.在信息 交换的过程中,我们都是对这些流进行数据的收发操作,简称为I/O操作(input and outp

IO模型介绍 以及同步异步阻塞非阻塞的区别

阻塞:用户进程访问数据时,如果未完成IO,等待IO操作完成或者进行系统调用来判断IO是否完成非阻塞:用户进程访问数据时,会马上返回一个状态值,无论是否完成 同步:用户进程发起IO(就绪判断)后,轮询内核状态异步:用户进程发起IO后,可以做其他事情,等待内核通知 介绍一下IO模型 网络IO模型和文件IO模型是一样的,上图是IO的5种模型,包括阻塞IO.非阻塞IO.多路复用IO.信号驱动的IO.异步IO. 一次IO包括两个过程,内核数据准备 .把数据从内核空间copy到用户空间. 1.阻塞IO(re

转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】

下面这篇,原理理解了, 再结合 这一周来的心得体会,整个框架就差不多了... http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的.下面记录下分别基于Select/Poll/Epoll的echo server实现.Python Select Server,可监控事件数量有限制: 1 2 3 4 5 6 7 8 9 10 11 12 13 14