Linux非阻塞IO(六)使用poll实现非阻塞的服务器端

关于poll模型监听的事件以及返回事件,我们定义宏如下:

#define kReadEvent (POLLIN | POLLPRI)
#define kWriteEvent (POLLOUT | POLLWRBAND)
#define kReadREvent (POLLIN | POLLPRI | POLLRDHUP)
#define kWriteREvent (POLLOUT)

前面我们说明了,为什么非阻塞IO必须具备缓冲区。事实上,对于server而言,每条TCP连接应该具有两个缓冲区,一个用于输入,一个用于输出。

sockfd –>  InputBuffer –> 用户空间 –> 处理数据 –> 得到结果 –> OutputBuffer –> sockfd

但是本例仅仅是简单的回射服务,收到数据立刻发出,所以我们只使用一个缓冲区。

对于每个TCP连接,我们对应这样的一个数据结构:

typedef struct{
    buffer_t buffer_;
} tcp_connection_t; //表示一条TCP连接

那么我们需要建立一个从fd到对应的tcp_connection_t的对应关系。我使用以下的数组:

tcp_connection_t *connsets[EVENTS_SIZE]; //提供从fd到TCP连接的映射

这个数组初始化为NULL,然后我使用fd作为下标,也就是说,当我需要处理某fd的时候,以该fd为下标就可以找到它相应的tcp_connection_t指针。这本质上是一种哈希表的思想

使用tcp_connection_t的逻辑是:

每当accept一个新的连接,就动态创建一个新的tcp_connection_t,并且将其指针保存至以peerfd为下标的位置中

每当连接关闭,需要释放内存,同时重置指针为NULL。

我将所有的代码全部写入main函数中,因为前面封装了buffer,代码的可读性已经提高,再加上我们的业务逻辑简单,进一步封装,反而降低可读性。

这里跟client有几处相同点:

仅当缓冲区有空闲空间时才监听read事件

仅当缓冲区有数据时,才监听write事件

所以我们每次进行poll调用前都需要将fd的监听事件重新装填。

server的代码整体框架如下:

//初始化poll

while(1)
{
    //重新装填fd数组

    //poll系统调用

    //依次处理每个fd的读写事件
}

所以完整的代码如下:

#define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <sys/socket.h>
#include "sysutil.h"
#include "buffer.h"
#include <assert.h>

#define EVENTS_SIZE 1024

typedef struct{
    buffer_t buffer_;
} tcp_connection_t; //表示一条TCP连接

tcp_connection_t *connsets[EVENTS_SIZE]; //提供从fd到TCP连接的映射

int main(int argc, char const *argv[])
{
    //获取监听fd
    int listenfd = tcp_server("localhost", 9981);
    //将监听fd设置为非阻塞
    activate_nonblock(listenfd);

    //初始化connsets
    int i;
    for(i = 0; i < EVENTS_SIZE; ++i)
    {
        connsets[i] = NULL;
    }

    //初始化poll
    struct pollfd events[EVENTS_SIZE];
    for(i = 0; i < EVENTS_SIZE; ++i)
        events[i].fd = -1;
    events[0].fd = listenfd;
    events[0].events = kReadEvent;
    int maxi = 0;

    while(1)
    {
        //重新装填events数组
        int i;
        for(i = 1; i < EVENTS_SIZE; ++i)
        {
            int fd = events[i].fd;
            events[i].events = 0; //重置events
            if(fd == -1)
                continue;
            assert(connsets[fd] != NULL);

            //当Buffer中有数据可读时,才监听write事件
            if(buffer_is_readable(&connsets[fd]->buffer_))
            {
                events[i].events |= kWriteEvent;
            }

            //当Buffer中有空闲空间时,才监听read事件
            if(buffer_is_writeable(&connsets[fd]->buffer_))
            {
                events[i].events |= kReadEvent;
            }

        }

        //poll调用
        int nready = poll(events, maxi + 1, 5000);
        if(nready == -1)
            ERR_EXIT("poll");
        else if(nready == 0)
        {
            printf("poll timeout.\n");
            continue;
        }

        //处理listenfd
        if(events[0].revents & kReadEvent)
        {
            //接受一个新的客户fd
            int peerfd = accept4(listenfd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC);
            if(peerfd == -1)
                ERR_EXIT("accept4");
            //新建tcp连接
            tcp_connection_t *pt = (tcp_connection_t*)malloc(sizeof(tcp_connection_t));
            buffer_init(&pt->buffer_);
            //将该tcp连接放入connsets
            connsets[peerfd] = pt;
            //放入events数组
            int i;
            for(i = 0; i < EVENTS_SIZE; ++i)
            {
                if(events[i].fd == -1)
                {
                    events[i].fd = peerfd; //这里不必监听fd
                    if(i > maxi)
                        maxi = i; //更新maxi
                    break;
                }
            }
            if(i == EVENTS_SIZE)
            {
                fprintf(stderr, "too many clients\n");
                exit(EXIT_FAILURE);
            }
        }

        //处理客户fd
        //int i;
        for(i = 1; i <= maxi; ++i)
        {
            int sockfd = events[i].fd;
            if(sockfd == -1)
                continue;
            //取出指针
            tcp_connection_t *pt = connsets[sockfd];
            assert(pt != NULL);
            if(events[i].revents & kReadREvent) //读取数据
            {
                if(buffer_read(&pt->buffer_, sockfd) == 0)
                {
                    //close
                    events[i].fd = -1;
                    close(sockfd);
                    free(pt);
                    connsets[sockfd] = NULL;
                    continue; //继续下一次循环
                }
            }

            if(events[i].revents & kWriteREvent) //可以发送数据
            {
                buffer_write(&pt->buffer_, sockfd);
            }
        }
    }

    close(listenfd);

    return 0;
}

需要注意的是,关于accept,我使用的是accpet4,这是Linux新增的系统调用:

 

int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);

与accept的区别就是accept4可以指定非阻塞标志位。

 

下文使用epoll。

时间: 2024-10-06 20:05:10

Linux非阻塞IO(六)使用poll实现非阻塞的服务器端的相关文章

转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】

下面这篇,原理理解了, 再结合 这一周来的心得体会,整个框架就差不多了... http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的.下面记录下分别基于Select/Poll/Epoll的echo server实现.Python Select Server,可监控事件数量有限制: 1 2 3 4 5 6 7 8 9 10 11 12 13 14

Linux下的非阻塞IO(一)

非阻塞IO是相对于传统的阻塞IO而言的. 我们首先需要搞清楚,什么是阻塞IO.APUE指出,系统调用分为两类,低速系统调用和其他,其中低速系统调用是可能会使进程永远阻塞的一类系统调用.但是与磁盘IO有关的系统调用是个例外. 我们以read和write为例,read函数读取stdin,如果是阻塞IO,那么: 如果我们不输入数据,那么read函数会一直阻塞,一直到我们输入数据为止. 如果是非阻塞IO,那么: 如果存在数据,读取然后返回,如果没有输入,那么直接返回-1,errno置为EAGAIN 我们

{python之IO多路复用} IO模型介绍 阻塞IO(blocking IO) 非阻塞IO(non-blocking IO) 多路复用IO(IO multiplexing) 异步IO(Asynchronous I/O) IO模型比较分析 selectors模块

阅读目录 一 IO模型介绍 二 阻塞IO(blocking IO) 三 非阻塞IO(non-blocking IO) 四 多路复用IO(IO multiplexing) 五 异步IO(Asynchronous I/O) 六 IO模型比较分析 七 selectors模块 一 IO模型介绍 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念                                                          我们通俗一点讲: Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写.如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就

从操作系统内核看Java非阻塞IO事件检测

非阻塞服务器模型最重要的一个特点是,在调用读取或写入接口后立即返回,而不会进入阻塞状态.在探讨单线程非阻塞IO模型前必须要先了解非阻塞情况下Socket事件的检测机制,因为对于非阻塞模式最重要的事情是检测哪些连接有感兴趣的事件发生,一般会有如下三种检测方式. 应用程序遍历socket检测 如图所示,当多个客户端向服务器请求时,服务器端会保存一个socket连接列表,应用层线程对socket列表进行轮询尝试读取或写入.对于读取操作,如果成功读取到若干数据则对读取到的数据进行处理,读取失败则下个循环

非阻塞IO模式原理

与阻塞模式对应的另一种模式叫非阻塞IO模式,在整个通信过程中读和写操作不会阻塞,当前处理线程不存在阻塞情况.从A机器到B机器它的通信过程是:A机器一条线程将通道设置为写事件后往下执行,而另外一条线程遍历到此通道有字节要写并往socket写数据,B机器一条线程遍历到此通道有字节要读,交给另外一条线程对socket读数据,处理完又把通道设置为写事件,遍历线程遍历到此通道有字节要写,又往socket写数据传往A机器,不断往下循环此操作直到完成通信.这个过程每台机器都有两类主要线程,一类是负责逻辑处理且

为何 epoll 的 ET 模式一定要设置为非阻塞IO

ET模式下每次write或read需要循环write或read直到返回EAGAIN错误.以读操作为例,这是因为ET模式只在socket描述符状态发生变化时才触发事件,如果不一次把socket内核缓冲区的数据读完,会导致socket内核缓冲区中即使还有一部分数据,该socket的可读事件也不会被触发根据上面的讨论,若ET模式下使用阻塞IO,则程序一定会阻塞在最后一次write或read操作,因此说ET模式下一定要使用非阻塞IO 原文地址:https://www.cnblogs.com/develo

IO中同步与异步,阻塞与非阻塞区别(转)

同步和异步关注的是消息通信机制 (synchronous communication/asynchronous communication) 同步请求,A调用B,B的处理是同步的,在处理完之前他不会通知A,只有处理完之后才会明确的通知A. 异步请求,A调用B,B的处理是异步的,B在接到请求后先告诉A我已经接到请求了,然后异步去处理,处理完之后通过回调等方式再通知A. 同步和异步最大的区别就是被调用方的执行方式和返回时机.同步指的是被调用方做完事情之后再返回,异步指的是被调用方先返回,然后再做事情

一起来写web server 06 -- 单线程非阻塞IO版本

阻塞IO的效率是在是低下,你如果要写高性能的web server的话,你必须使用非阻塞IO. 非阻塞IO的读写 在谈到非阻塞IO之前,必须先谈一谈阻塞IO,在网络编程中,我们假设有一个监听套接字的sockfd,这个sockfd是你调用了socket,listen, bind这一大票的函数得到的一个文件描述符.其实它默认就是阻塞的,具体的表现是: 使用accept函数监听sockfd时,如果没有连接到来,这个函数会一直阻塞在那里. 对sockfd调用recv函数的时候,如果对方还没有发送数据过来,