Linux高性能server编程——I/O复用

??

IO复用

I/O复用使得程序能同一时候监听多个文件描写叙述符。通常网络程序在下列情况下须要使用I/O复用技术:

  1. client程序要同一时候处理多个socket
  2. client程序要同一时候处理用户输入和网络连接
  3. TCPserver要同一时候处理监听socket和连接socket,这是I/O复用使用最多的场合
  4. server要同一时候处理TCP请求和UDP请求。比方本章将要讨论的会社server
  5. server要同一时候监听多个port。或者处理多种服务。

I/O复用尽管能同一时候监听多个文件描写叙述符,但它本身是堵塞的。而且当多个文件描写叙述符同一时候就绪时,假设不採用额外措施,程序就仅仅能按顺序依次处理当中的每个文件描写叙述符,这使得server程序看起来像是串行工作。

假设要实现并发,仅仅能使用多进程或多线程等变成手段。

select系统复用

select系统调用的用途是:在一段指定时间内。监听用户感兴趣的文件描写叙述符上的可读可写和异常等事件。

#include <sys/select.h>

int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  1. nfds參数指定被监听的文件描写叙述符的总数。

    通常被设置为select监听的全部文件描写叙述符中的最大值加1,由于文件描写叙述符是从0開始计数的

  2. readfds, writefds和exceptfds參数分别指向可读、可写和异常等事件相应的文件描写叙述符集合。

    fd_set结构体仅包括一个整形数组。高数组的每一个元素的每一位标记一个文件描写叙述符。

    可用例如以下宏来訪问fd_set结构体中的位:

    voidFD_CLR(int fd, fd_set *set);

    int  FD_ISSET(int fd, fd_set *set);

    voidFD_SET(int fd, fd_set *set);

    void FD_ZERO(fd_set*set);

  3. timeout參数用来设置select函数的超时时间。它是一个timeval指针。timeval结构体定义例如以下:

struct timeval {

long   tv_sec;        /*
seconds */

long   tv_usec;       /*
microseconds */

};

假设给timeout传递NULL。则select将一直堵塞,直到某个文件描写叙述符就绪。

select成功时返回就绪文件描写叙述符的总数,假设在超时时间内没有不论什么文件描写叙述符就绪返回0,失败返回-1,并设置errno;假设select在等待期间收到信号,则select马上返回-1,并设置errno为EINTR。

poll系统调用

poll系统调用和select类似,也是在指定时间内伦旭一定数量的文件描写叙述符。以測试当中是否有就绪。

poll原型例如以下:

#include<poll.h>

int poll(structpollfd *fds, nfds_t nfds, int timeout);

1)fds參数是一个pollfd结构类型的数组,它指定所以我们感兴趣的文件描写叙述符上发生的刻度、可写和异常等时间。其结构定义例如以下:

struct pollfd {

int  fd;        /*
file descriptor */

short events;    /*
requested events */

short revents;   /*
returned events */

};

当中fd成员指定文件描写叙述符;events成员告诉poll监听f上的那些时间,它是一系列时间的按位或;revents成员则由内核改动,以通知应用程序fd上实际发生了哪些事件。

2)nfds參数指定被监听事件集合的大小。其类型nfds_t定义例如以下:

typedef unsignedlong int nfds_t;

  1. timeout參数指定poll的超时时间。单位是毫秒。当timeout为-1时。poll调用将永远堵塞,直到某个事件发生;当为0时,poll调用马上返回。

    poll返回值含义与select同样。

epoll系列系统调用

内核事件表

epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有非常大差异。首先,epoll使用一组函数来完毕任务。而不是单个函数。

其次,epoll把用户关心的文件描写叙述符上的时间放在内核里的一个时间表中。从而无需向select和poll那样每次调用都要反复传入文件描写叙述符集或事件集。但epoll须要使用一个额外的文件描写叙述符,来唯一标识内核中的这个时间表。这个文件描写叙述符使用例如以下epoll_create函数创建:

#include <sys/epoll.h>

int epoll_create(int size);

size參数给内核一个提示,告诉它时间表须要多大。该函数返回的文件描写叙述符将作用其它全部epoll系统调用的第一个參数,以指定要訪问的内核事件表。

以下的函数用来操作epoll的内核事件表:

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);

fd參数是要操作的文件描写叙述符。op參数则制定操作类型,操作类型有例如以下3种:

EPOLL_CTL_ADD:往事件表中注冊fd上的事件

EPOLL_CTL_MOD:改动fd上的注冊事件

EPOLL_CTL_DEL:删除fd上的注冊事件

event參数指定时间,它是epoll_event结构指针类型。epoll_event的定义例如以下:

struct epoll_event {

uint32_t    events;     /*
Epoll events */

epoll_data_t data;       /*
User data variable */

};

当中events成员描写叙述事件类型。data成员用于存储用户数据。其类型epoll_data的定义例如以下:

typedef union epoll_data {

void       *ptr;

int         fd;

uint32_t    u32;

uint64_t    u64;

} epoll_data_t;

epoll_data_t是一个联合体,当中4个成员中使用最多的是fd,它指定事件所丛书的目标文件描写叙述符。

epoll_ctl成功时返回0,失败时返回-1并设置errno。

epoll_wait函数

epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描写叙述符上的事件,其原型例如以下:

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);

该函数成功时返回就绪的文件描写叙述符的个数,失败是返回-1,并设置errno。

maxevents參数指定最多监听多少时间,必须大于0.

epoll_wait函数假设检測到事件。就将全部就绪的事件从内核事件表中拷贝到它的第二个參数events指向的数组中。这个数组仅仅用于输出epoll_wait检測到的就绪时间,而不像select和poll数组那样即用于传入用户注冊的时间,实用于输出内核检測到的就绪时间。这就极大的提高了应用程序索引就绪文件描写叙述符的效率。

以下的代码体现了这个区别:

/*怎样索引poll返回的就绪文件描写叙述符*/

int ret = poll(fds, MAX_EVENT_NUMBER, -1);

/*必须遍历全部注冊文件描写叙述符并找到当中的就绪着*/

for(int i=0;i<MAX_EVENT_NUMBER; ++i)

{

if(fds[i].revents & POLLIN)

{

int sockfd = fds[i].fd;

/*处理sockfd*/

}

}

/*怎样索引epoll返回的就绪文件描写叙述符*/

int ret =epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1);

/*遍历就绪的ret个文件描写叙述符*/

for( int i=0;i<ret; i++)

{

int socketfd = events[i].data.fd;

/*socket肯定就绪。直接处理*/

}

LT和ET模式

epoll对文件描写叙述符的操作有两种模式:LT模式(Levek
Trigger,电平触发)和ET模式(E多个Trigger,边沿触发)。

LT模式是默认的工作模式,这样的模式下epoll相当于一个效率较高的poll。

当往epoll内核事件表中注冊一个文件描写叙述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描写叙述符。ET模式是epoll的搞笑工作模式。

对于採用LT工作模式的文件描写叙述符。当epoll_wait检測到其上有时间发生并将此事件通知应用程序后,应用程序能够不马上处理该事件。这样。当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。而对于採用ET工作模式的文件描写叙述符,当epoll_wait检測到其上有时间发生并将此时间通知应用程序后。应用程序必须马上处理该事件,由于兴许的epoll_wait调用将不再向用用程序通知这一事件。可见。ET在非常大程度上减少了同一个epoll事件被反复触发的次数,因此效率比LT模式高。

文章最后的程序清单1比較了两种模式:

当在clienttelnet传输“abcdefghijklmnopqrstuvwxyz”字符串时。输出例如以下

ET模式输出:

event trigger once

get 9 bytes of content: abcdefghi

get 9 bytes of content: jklmnopqr

get 9 bytes of content: stuvwxyz

get 1 bytes of content:

LT模式输出:

event trigger once

get 9 bytes of content: abcdefghi

event trigger once

get 9 bytes of content: jklmnopqr

event trigger once

get 9 bytes of content: stuvwxyz

event trigger once

get 1 bytes of content:

能够看到正如我们预期,ET模式下时间仅仅被触发一次,要比LT模式下少非常多。

EPOLLONESHOT事件

即使我们使用ET模式。一个socket上的某个事件还是可能被触发多次。这在并发程序中会引起一个问题。

比方一个县城在读取完某个socket上的数据后開始处理这些数据,二在数据的处理project中该socket上又有新数据可读。此时另外一个县城北唤醒来读取这些新的数据。

于是就出现了两个线程同一时候操作一个socket的局面。这当然不是我们期望的。

我们期望的是一个socket连接在任一时刻都仅仅被一个线程处理。这一点能够使用spoll的EPOLLONESHOT事件实现。

对于注冊了EPOLLONESHOT事件的文件描写叙述符,操作系统最多触发其上注冊的一个可读、可写或者异常事件,并且仅仅触发一次。除非我们使用epoll_ctl函数重置该文件描写叙述符上注冊的EPOLLONESHOT事件ain.zheyang。当一个线程在处理某个socket时。其它线程是不可能有机会操作该socket的。

但反过来思考,注冊了EPOLLONESHOT事件的socket一旦被某个线程处理完成,该线程就应该马上重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其它工作线程有机会处理这个socket。

程序清单2展示了EPOLLONESHOT事件的使用。

三组I/O复用函数的比較


系统调用


select


poll


epoll


事件集合


用户通过3个參数分别传入感兴趣的可读、可写及异常等事件。内核通过对这些參数在线改动来反馈当中的就绪事件。这使得用户每次调用select都要重置这3个參数


统一处理全部事件类型,因此仅仅须要一个事件集參数。用户通过pollfd.events传入感兴趣的事件,内核通过改动pollfd.revents反馈当中就绪的事件


内核通过一个时间表直接管理用户感兴趣的全部事件。因此每次调用epoll_wait时,无需重复传入用户感兴趣的时间。

epoll_wait系统调用的參数events仅用来反馈就绪的事件。


应用程序索引就绪文件描写叙述符的时间复杂度


O(N)


O(N)


O(1)


最大支持文件描写叙述符数


一般有最大值限制


65535


65535


工作模式


LT


LT


支持ET高效模式


内核实现和工作效率


採用轮询方法来检測就绪事件,算法复杂度为O(N)


採用轮询方式检測就绪事件,算法复杂度为O(N)


採用回调方式来检測就绪事件。算法复杂度为O(1)

聊天程序见程序(poll实现)见清单3

同一时候处理TCP和UDP服务的回射server程序(epoll程序)见清单4

程序清单1:
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, false );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once\n" );
            memset( buf, ‘\0‘, BUFFER_SIZE );
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret <= 0 )
            {
                close( sockfd );
                continue;
            }
            printf( "get %d bytes of content: %s\n", ret, buf );
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once\n" );
            while( 1 )
            {
                memset( buf, ‘\0‘, BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later\n" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s\n", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }

        lt( events, ret, epollfd, listenfd );
        //et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}
程序清单2
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds
{
   int epollfd;
   int sockfd;
};

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool oneshot )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if( oneshot )
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void reset_oneshot( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}

void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, ‘\0‘, BUFFER_SIZE );
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
                reset_oneshot( epollfd, sockfd );
                printf( "read later\n" );
                break;
            }
        }
        else
        {
            printf( "get content: %s\n", buf );
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, false );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < ret; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN )
            {
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }

    close( listenfd );
    return 0;
}
程序清单3
客户端程序
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>

#define BUFFER_SIZE 64

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    struct sockaddr_in server_address;
    bzero( &server_address, sizeof( server_address ) );
    server_address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &server_address.sin_addr );
    server_address.sin_port = htons( port );

    int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( sockfd >= 0 );
    if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
    {
        printf( "connection failed\n" );
        close( sockfd );
        return 1;
    }

    pollfd fds[2];
    fds[0].fd = 0;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    fds[1].fd = sockfd;
    fds[1].events = POLLIN | POLLRDHUP;
    fds[1].revents = 0;
    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret = pipe( pipefd );
    assert( ret != -1 );

    while( 1 )
    {
        ret = poll( fds, 2, -1 );
        if( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }

        if( fds[1].revents & POLLRDHUP )
        {
            printf( "server close the connection\n" );
            break;
        }
        else if( fds[1].revents & POLLIN )
        {
            memset( read_buf, ‘\0‘, BUFFER_SIZE );
            recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
            printf( "%s\n", read_buf );
        }

        if( fds[0].revents & POLLIN )
        {
            ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
            ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
        }
    }

    close( sockfd );
    return 0;
}

服务器程序
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535

struct client_data
{
    sockaddr_in address;
    char* write_buf;
    char buf[ BUFFER_SIZE ];
};

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    client_data* users = new client_data[FD_LIMIT];
    pollfd fds[USER_LIMIT+1];
    int user_counter = 0;
    for( int i = 1; i <= USER_LIMIT; ++i )
    {
        fds[i].fd = -1;
        fds[i].events = 0;
    }
    fds[0].fd = listenfd;
    fds[0].events = POLLIN | POLLERR;
    fds[0].revents = 0;

    while( 1 )
    {
        ret = poll( fds, user_counter+1, -1 );
        if ( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }

        for( int i = 0; i < user_counter+1; ++i )
        {
            if( ( fds[i].fd == listenfd ) && ( fds[i].revents & POLLIN ) )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d\n", errno );
                    continue;
                }
                if( user_counter >= USER_LIMIT )
                {
                    const char* info = "too many users\n";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }
                user_counter++;
                users[connfd].address = client_address;
                setnonblocking( connfd );
                fds[user_counter].fd = connfd;
                fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
                fds[user_counter].revents = 0;
                printf( "comes a new user, now have %d users\n", user_counter );
            }
            else if( fds[i].revents & POLLERR )
            {
                printf( "get an error from %d\n", fds[i].fd );
                char errors[ 100 ];
                memset( errors, ‘\0‘, 100 );
                socklen_t length = sizeof( errors );
                if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
                {
                    printf( "get socket option failed\n" );
                }
                continue;
            }
            else if( fds[i].revents & POLLRDHUP )
            {
                users[fds[i].fd] = users[fds[user_counter].fd];
                close( fds[i].fd );
                fds[i] = fds[user_counter];
                i--;
                user_counter--;
                printf( "a client left\n" );
            }
            else if( fds[i].revents & POLLIN )
            {
                int connfd = fds[i].fd;
                memset( users[connfd].buf, ‘\0‘, BUFFER_SIZE );
                ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );
                printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        close( connfd );
                        users[fds[i].fd] = users[fds[user_counter].fd];
                        fds[i] = fds[user_counter];
                        i--;
                        user_counter--;
                    }
                }
                else if( ret == 0 )
                {
                    printf( "code should not come to here\n" );
                }
                else
                {
                    for( int j = 1; j <= user_counter; ++j )
                    {
                        if( fds[j].fd == connfd )
                        {
                            continue;
                        }

                        fds[j].events |= ~POLLIN;
                        fds[j].events |= POLLOUT;
                        users[fds[j].fd].write_buf = users[connfd].buf;
                    }
                }
            }
            else if( fds[i].revents & POLLOUT )
            {
                int connfd = fds[i].fd;
                if( ! users[connfd].write_buf )
                {
                    continue;
                }
                ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );
                users[connfd].write_buf = NULL;
                fds[i].events |= ~POLLOUT;
                fds[i].events |= POLLIN;
            }
        }
    }

    delete [] users;
    close( listenfd );
    return 0;
}
程序清单4 回射服务器程序
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    //event.events = EPOLLIN | EPOLLET;
    event.events = EPOLLIN;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
    int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );
    assert( udpfd >= 0 );

    ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd );
    addfd( epollfd, udpfd );

    while( 1 )
    {
        int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( number < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd );
            }
            else if ( sockfd == udpfd )
            {
                char buf[ UDP_BUFFER_SIZE ];
                memset( buf, ‘\0‘, UDP_BUFFER_SIZE );
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );

                ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength );
                if( ret > 0 )
                {
                    sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength );
                }
            }
            else if ( events[i].events & EPOLLIN )
            {
                char buf[ TCP_BUFFER_SIZE ];
                while( 1 )
                {
                    memset( buf, ‘\0‘, TCP_BUFFER_SIZE );
                    ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 );
                    if( ret < 0 )
                    {
                        if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                        {
                            break;
                        }
                        close( sockfd );
                        break;
                    }
                    else if( ret == 0 )
                    {
                        close( sockfd );
                    }
                    else
                    {
                        send( sockfd, buf, ret, 0 );
                    }
                }
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }

    close( listenfd );
    return 0;
}
时间: 2024-08-09 04:29:50

Linux高性能server编程——I/O复用的相关文章

Linux高性能服务器编程——I/O复用

 IO复用 I/O复用使得程序能同时监听多个文件描述符,通常网络程序在下列情况下需要使用I/O复用技术: 客户端程序要同时处理多个socket 客户端程序要同时处理用户输入和网络连接 TCP服务器要同时处理监听socket和连接socket,这是I/O复用使用最多的场合 服务器要同时处理TCP请求和UDP请求.比如本章将要讨论的会社服务器 服务器要同时监听多个端口,或者处理多种服务. I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的.并且当多个文件描述符同时就绪时,如果不采用额外措施

Linux高性能server编程——高级I/O函数

 高级I/O函数 pipe函数 pipe函数用于创建一个管道,实现进程间的通信. #include <unistd.h> int pipe(int pipefd[2]); 通过pipe函数创建的文件描写叙述符fd[0]和fd[1]分别构成管道的两端,往fd[1]写入的数据能够从fd[0]读出,不能反过来.管道内部传输的数据时字节流,和TCP字节流概念同样,但有差别,管道本身拥有一个容量限制,它规定假设应用程序不将数据从管道读走的话,该管道最多能被写入多少字节的数据.管道容量阿东小默认是65

Linux高性能server编程——信号及应用

?? 信号 信号是由用户.系统或者进程发送给目标进程的信息.以通知目标进程某个状态的改变或系统异常. Linux信号可由例如以下条件产生: 对于前台进程.用户能够通过输入特殊的终端字符来给它发送信号.比方输入Ctrl+C一般会给进程发送一个终端信号. 2.系统异常 系统状态变化 执行kill命令或调用kill函数 Linux信号概述 发送信号 Linux下,一个进程给其它进程发送信号的API是kill函数.其定义例如以下: #include <sys/types.h> #include <

Linux 高性能server编程——高级I/O函数

重定向dup和dup2函数 #include <unistd.h> int dup(int file_descriptor); int dup2(int file_descriptor_one, int file_descriptor_two); dup创建一个新的文件描写叙述符, 此描写叙述符和原有的file_descriptor指向同样的文件.管道或者网络连接. dup返回的文件描写叙述符总是取系统当前可用的最小整数值. dup2函数通过使用參数file_descriptor_two指定新

Linux高性能服务器编程——定时器

 定时器 服务器程序通常管理着众多定时事件,因此有效组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响.位置我们要将每个定时事件封装成定时器,并使用某种容器类型的数据结构,比如链表.排序链表和时间轮将所有定时器串联起来,以实现对定时事件的统一管理. Linux提供三种定时方法: 1.socket选项SO_RECVTIMEO和SO_SNDTIMEO. 2.SIGALRM信号 3.I/O复用系统调用的超时参数 socket选项SO_RCVTI

Linux 高性能服务器编程——高级I/O函数

重定向dup和dup2函数 [cpp] view plaincopyprint? #include <unistd.h> int dup(int file_descriptor); int dup2(int file_descriptor_one, int file_descriptor_two); dup创建一个新的文件描述符, 此描述符和原有的file_descriptor指向相同的文件.管道或者网络连接. dup返回的文件描述符总是取系统当前可用的最小整数值. dup2函数通过使用参数f

Linux高性能服务器编程——信号及应用

 信号 信号是由用户.系统或者进程发送给目标进程的信息,以通知目标进程某个状态的改变或系统异常.Linux信号可由如下条件产生: 对于前台进程,用户可以通过输入特殊的终端字符来给它发送信号.比如输入Ctrl+C通常会给进程发送一个终端信号. 2.系统异常 系统状态变化 运行kill命令或调用kill函数 Linux信号概述 发送信号 Linux下,一个进程给其他进程发送信号的API是kill函数.其定义如下: #include <sys/types.h> #include <sign

linux高性能服务器编程

<Linux高性能服务器编程>:当当网.亚马逊 目录: 第一章:tcp/ip协议族 第二章:ip协议族 第三章:tcp协议详解 第四章:tcp/ip通信案例:访问Internet 第五章:linux网络编程基础API 第六章:高级IO函数 第七章:linux服务器程序规范 第八章:高性能服务器框架 第九章:IO复用 第十章:信号 第十一章:定时器 第十二章:高性能IO框架库libevent 第十三章:多进程编程 第十四章:多线程编程 第十五章:进程池和线程池 第十六章:服务器调制.调试和测试

Linux高性能服务器编程——多进程编程

多进程编程 多进程编程包括如下内容: 复制进程影映像的fork系统调用和替换进程映像的exec系列系统调用. 僵尸进程以及如何避免僵尸进程 进程间通信(Inter-Process Communication,IPC)最简单的方式:管道 3种进程间通信方式:信号量,消息队列和共享内存 fork系统调用 #include<unistd.h> pid_tfork(void); 该函数的每次都用都返回两次,在父进程中返回的是子进程的PID,在子进程中返回的是0.该返回值是后续代码判断当前进程是父进程还