IO多路复用深入浅出

前言

从零单排高性能问题,这次轮到异步通信了。这个领域入门有点难,需要了解UNIX五种IO模型和 TCP协议,熟练使用三大异步通信框架:Netty、NodeJS、Tornado。目前所有标榜异步的通信框架用的都不是异步IO模型,而是IO多路复 用中的epoll。因为Python提供了对Linux内核API的友好封装,所以我选择Python来学习IO多路复用。

IO多路复用

  1. select

    举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    ‘‘‘
    Created on Feb 16, 2016
    
    @author: mountain
    ‘‘‘
    import socket
    import select
    from Queue import Queue
    
    #AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。
    #SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    #设置监听的ip和port
    server_address = (‘localhost‘, 1234)
    server.bind(server_address)
    #设置backlog为5,client向server发起connect,server accept后建立长连接,
    #backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。
    server.listen(5)
    #注册在socket上的读事件
    inputs = [server]
    #注册在socket上的写事件
    outputs = []
    #注册在socket上的异常事件
    exceptions = []
    #每个socket有一个发送消息的队列
    msg_queues = {}
    print "server is listening on %s:%s." % server_address
    while inputs:
        #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码
        readable, writable, exceptional = select.select(inputs, outputs, exceptions)
        for sock in readable:
            #client向server发起connect也是读事件,server accept后产生socket加入读队列中
            if sock is server:
                conn, addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
                msg_queues[conn] = Queue()
                print "server accepts a conn."
            else:
                #读取client发过来的数据,最多读取1k byte。
                data = sock.recv(1024)
                #将收到的数据返回给client
                if data:
                    msg_queues[sock].put(data)
                    if sock not in outputs:
                        #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写
                        outputs.append(sock)
                else:
                    #client传过来的消息为空,说明已断开连接
                    print "server closes a conn."
                    if sock in outputs:
                        outputs.remove(sock)
                    inputs.remove(sock)
                    sock.close()
                    del msg_queues[sock]
        for sock in writable:
            if not msg_queues[sock].empty():
                sock.send(msg_queues[sock].get_nowait())
            if msg_queues[sock].empty():
                outputs.remove(sock)
        for sock in exceptional:
            inputs.remove(sock)
            if sock in outputs:
                outputs.remove(sock)
            sock.close()
            del msg_queues[sock]
    [[email protected] ~/workspace/wire]$ telnet localhost 1234
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is ‘^]‘.
    1
    1

    select有3个缺点:

    1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
    2. 每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
      这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:
      int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
    3. fd数量有限,默认1024。
  2. poll

    采用poll重新实现EchoServer,只要搞懂了select,poll也不难,只是api的参数不太一样而已。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    ‘‘‘
    Created on Feb 27, 2016
    
    @author: mountain
    ‘‘‘
    import select
    import socket
    import sys
    import Queue
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    server_address = (‘localhost‘, 1234)
    server.bind(server_address)
    server.listen(5)
    print ‘server is listening on %s port %s‘ % server_address
    msg_queues = {}
    timeout = 1000 * 60
    #POLLIN: There is data to read
    #POLLPRI: There is urgent data to read
    #POLLOUT: Ready for output
    #POLLERR: Error condition of some sort
    #POLLHUP: Hung up
    #POLLNVAL: Invalid request: descriptor not open
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
    READ_WRITE = READ_ONLY | select.POLLOUT
    poller = select.poll()
    #注册需要监听的事件
    poller.register(server, READ_ONLY)
    #文件描述符和socket映射
    fd_to_socket = { server.fileno(): server}
    while True:
        events = poller.poll(timeout)
        for fd, flag in events:
            sock = fd_to_socket[fd]
            if flag & (select.POLLIN | select.POLLPRI):
                if sock is server:
                    conn, client_address = sock.accept()
                    conn.setblocking(False)
                    fd_to_socket[conn.fileno()] = conn
                    poller.register(conn, READ_ONLY)
                    msg_queues[conn] = Queue.Queue()
                else:
                    data = sock.recv(1024)
                    if data:
                        msg_queues[sock].put(data)
                        poller.modify(sock, READ_WRITE)
                    else:
                        poller.unregister(sock)
                        sock.close()
                        del msg_queues[sock]
            elif flag & select.POLLHUP:
                poller.unregister(sock)
                sock.close()
                del msg_queues[sock]
            elif flag & select.POLLOUT:
                if not msg_queues[sock].empty():
                    msg = msg_queues[sock].get_nowait()
                    sock.send(msg)
                else:
                    poller.modify(sock, READ_ONLY)
            elif flag & select.POLLERR:
                poller.unregister(sock)
                sock.close()
                del msg_queues[sock]

    poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性,它的linux内核api是这样的:

    int poll (struct pollfd *fds, unsigned int nfds, int timeout);
    struct pollfd {
        int fd; /* file descriptor */
        short events; /* requested events to watch */
        short revents; /* returned events witnessed */
    };
  3. epoll

    用法与poll几乎一样。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    ‘‘‘
    Created on Feb 28, 2016
    
    @author: mountain
    ‘‘‘
    import select
    import socket
    import Queue
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    server_address = (‘localhost‘, 1234)
    server.bind(server_address)
    server.listen(5)
    print ‘server is listening on %s port %s‘ % server_address
    msg_queues = {}
    timeout = 60
    READ_ONLY = select.EPOLLIN | select.EPOLLPRI
    READ_WRITE = READ_ONLY | select.EPOLLOUT
    epoll = select.epoll()
    #注册需要监听的事件
    epoll.register(server, READ_ONLY)
    #文件描述符和socket映射
    fd_to_socket = { server.fileno(): server}
    while True:
        events = epoll.poll(timeout)
        for fd, flag in events:
            sock = fd_to_socket[fd]
            if flag & READ_ONLY:
                if sock is server:
                    conn, client_address = sock.accept()
                    conn.setblocking(False)
                    fd_to_socket[conn.fileno()] = conn
                    epoll.register(conn, READ_ONLY)
                    msg_queues[conn] = Queue.Queue()
                else:
                    data = sock.recv(1024)
                    if data:
                        msg_queues[sock].put(data)
                        epoll.modify(sock, READ_WRITE)
                    else:
                        epoll.unregister(sock)
                        sock.close()
                        del msg_queues[sock]
            elif flag & select.EPOLLHUP:
                epoll.unregister(sock)
                sock.close()
                del msg_queues[sock]
            elif flag & select.EPOLLOUT:
                if not msg_queues[sock].empty():
                    msg = msg_queues[sock].get_nowait()
                    sock.send(msg)
                else:
                    epoll.modify(sock, READ_ONLY)
            elif flag & select.EPOLLERR:
                epoll.unregister(sock)
                sock.close()
                del msg_queues[sock]

    epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。为了更好地理解epoll,我们来看一下linux内核api的用法。

    int epoll_create(int size)//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)//注册事件,每个fd只拷贝一次。
    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)/*等待IO事件,事件发生时,
    内核调用回调函数,把就绪fd放入就绪链表中,并唤醒epoll_wait,epoll_wait只需要遍历就绪链表即可,
    而select和poll都是遍历所有fd,这效率高下立判。*/
时间: 2024-11-04 18:45:19

IO多路复用深入浅出的相关文章

python开发IO模型:阻塞&非阻塞&异步IO&多路复用&selectors

一 IO模型介绍 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西.这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同.所以,

IO模型--阻塞IO,非阻塞IO,IO多路复用,异步IO

IO模型介绍: * blocking IO 阻塞IO * nonblocking IO 非阻塞IO * IO multiplexing IO多路复用 * signal driven IO 信号驱动IO () * asynchronous IO 异步IO IO模型介绍: 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,

{python之IO多路复用} IO模型介绍 阻塞IO(blocking IO) 非阻塞IO(non-blocking IO) 多路复用IO(IO multiplexing) 异步IO(Asynchronous I/O) IO模型比较分析 selectors模块

阅读目录 一 IO模型介绍 二 阻塞IO(blocking IO) 三 非阻塞IO(non-blocking IO) 四 多路复用IO(IO multiplexing) 五 异步IO(Asynchronous I/O) 六 IO模型比较分析 七 selectors模块 一 IO模型介绍 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能

【IO多路复用】 -- 2019-08-16 19:10:29

原文: http://blog.gqylpy.com/gqy/234 " 目录 一.IO模型介绍 二.阻塞IO(blocking IO) 三.非阻塞IO(non-blocking IO) 四.多路复用IO(IO multiplexing) 五.异步IO(Asynchronous I/O) 六.模型比较分析 七.关于select.poll.epoll 一.IO模型介绍 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous) IO和异步(asynchro

【IO多路复用】 -- 2019-08-16 19:17:42

原文: http://blog.gqylpy.com/gqy/234 " 目录 一.IO模型介绍 二.阻塞IO(blocking IO) 三.非阻塞IO(non-blocking IO) 四.多路复用IO(IO multiplexing) 五.异步IO(Asynchronous I/O) 六.模型比较分析 七.关于select.poll.epoll 一.IO模型介绍 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous) IO和异步(asynchro

【IO多路复用】 -- 2019-08-16 22:00:04

原文: http://blog.gqylpy.com/gqy/234 " 目录 一.IO模型介绍 二.阻塞IO(blocking IO) 三.非阻塞IO(non-blocking IO) 四.多路复用IO(IO multiplexing) 五.异步IO(Asynchronous I/O) 六.模型比较分析 七.关于select.poll.epoll 一.IO模型介绍 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous) IO和异步(asynchro

Unix C语言编写基于IO多路复用的小型并发服务器

背景介绍 如果服务器要同时处理网络上的套接字连接请求和本地的标准输入命令请求,那么如果我们使用accept来接受连接请求,则无法处理标准输入请求;类似地,如果在read中等待一个输入请求,则无法处理网络连接的请求. 所谓I/O多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作.但 select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而还

自动化运维Python系列之IO多路复用、SocketServer源码分析

IO多路复用 IO多路复用是指:通过一种机制,可以监视多个描述符,一旦某个系统描述符就绪(一般是读就绪或者写就绪)能够通知程序进行相应的读写操作 实例化例子就是在SocketServer模块中,客户端和服务端建立好连接,此时服务端通过监听conn这条链路,一旦客户端发送了数据,conn链路状态就发生变化,服务端就知道有数据要接收... Linux系统中同时存在select.pull.epoll三种IO多路复用机制 windows中只有select机制 1)select select本质上是通过设

IO多路复用——select

IO多路复用 是同步IO的一种,用一个进程一次等待多个IO就绪事件的发生,加大概率,尽可能高效的等. 适用场景 (1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用. (2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现. (3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用. (4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用. (5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O