(OK) Linux epoll模型—socket epoll server client chat—pthread

http://www.cnblogs.com/venow/archive/2012/11/30/2790031.html

http://blog.csdn.net/denkensk/article/details/41978015

定义:

  epoll是Linux内核为处理大批句柄而作改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著的减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。因为它会复用文件描述符集合来传递结果而不是迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一个原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select\poll那种IO事件的电平触发(Level
Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提供应用程序的效率。

工作方式:

  LT(level triggered):水平触发,缺省方式,同时支持block和no-block socket,在这种做法中,内核告诉我们一个文件描述符是否被就绪了,如果就绪了,你就可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错的可能性较小。传统的select\poll都是这种模型的代表。

  ET(edge-triggered):边沿触发,高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪状态时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如:你在发送、接受或者接受请求,或者发送接受的数据少于一定量时导致了一个EWOULDBLOCK错误)。但是请注意,如果一直不对这个fs做IO操作(从而导致它再次变成未就绪状态),内核不会发送更多的通知。

  区别:LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读取,则不断的通知你。而ET则只在事件发生之时通知。

使用方式:

  1、int epoll_create(int size)

创建一个epoll句柄,参数size用来告诉内核监听的数目。

  2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll事件注册函数,

  参数epfd为epoll的句柄;

  参数op表示动作,用3个宏来表示:EPOLL_CTL_ADD(注册新的fd到epfd),EPOLL_CTL_MOD(修改已经注册的fd的监听事件),EPOLL_CTL_DEL(从epfd删除一个fd);

  参数fd为需要监听的标示符;

  参数event告诉内核需要监听的事件,event的结构如下:

    struct epoll_event {
          __uint32_t events; /* Epoll events */
          epoll_data_t data; /* User data variable */
        };

  其中events可以用以下几个宏的集合:

  EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)

  EPOLLOUT:表示对应的文件描述符可以写

  EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)

  EPOLLERR:表示对应的文件描述符发生错误

  EPOLLHUP:表示对应的文件描述符被挂断;

  EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的

  EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

3、 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

  等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

应用举例:

  下面,我引用google code中别人写的一个简单程序来进行说明。svn路径:http://sechat.googlecode.com/svn/trunk/

  该程序一个简单的聊天室程序,用Linux C++写的,服务器主要是用epoll模型实现,支持高并发,我测试在有10000个客户端连接服务器的时候,server处理时间不到1秒,当然客户端只是与服务器连接之后,接受服务器的欢迎消息而已,并没有做其他的通信。虽然程序比较简单,但是在我们考虑服务器高并发时也提供了一个思路。在这个程序中,我已经把所有的调试信息和一些与epoll无关的信息干掉,并添加必要的注释,应该很容易理解。

  程序共包含3个头文件和3个cpp文件。其中3个cpp文件中,每一个cpp文件都是一个应用程序,server.cpp:服务器程序,client.cpp:单个客户端程序,tester.cpp:模拟高并发,开启10000个客户端去连服务器。

utils.h头文件,就包含一个设置socket为不阻塞函数,如下:

    #include <fcntl.h>
    int setnonblocking(int sockfd)
    {
        CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK));
        return 0;
    }

local.h头文件,一些常量的定义和函数的声明,如下:

    #include "head.h"
    #define BUF_SIZE 1024        //默认缓冲区
    #define SERVER_PORT 11111    //监听端口
    #define SERVER_HOST "127.0.0.1"    //服务器IP地址
    #define EPOLL_RUN_TIMEOUT -1    //epoll的超时时间
    #define EPOLL_SIZE 10000    //epoll监听的客户端的最大数目

    #define STR_WELCOME "Welcome to seChat! You ID is : Client #%d"
    #define STR_MESSAGE "Client #%d>> %s"
    #define STR_NOONE_CONNECTED "Noone connected to server expect you!"
    #define CMD_EXIT "EXIT"

    //两个有用的宏定义:检查和赋值并且检测
    #define CHK(eval) if(eval < 0){perror("eval"); exit(-1);}
    #define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);}

    int setnoblock(int sockfd);
    //int handle_message(int new_fd);
    void* handle_message(void *arg);

head.h文件,程序中所要用到的头文件,如下:

    //#ifndef __HEAD__H__
    //#define __HEAD__H__
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <time.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>

    #include <pthread.h>

    #include <iostream>
    #include <list>
    #include <iterator>

    //#endif

server.cpp文件,epoll模型就在这里实现,如下:

    #include "local.h"
    #include "utils.h"
    #include "head.h"
    using namespace std;

    // 存放客户端socket描述符的list
    list < int >clients_list;

    int main(int argc, char *argv[])
    {
        int listener;        //监听socket
        struct sockaddr_in addr, their_addr;
        addr.sin_family = PF_INET;
        addr.sin_port = htons(SERVER_PORT);
        addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
        socklen_t socklen;
        socklen = sizeof(struct sockaddr_in);

        static struct epoll_event ev, events[EPOLL_SIZE];
        ev.events = EPOLLIN | EPOLLET;    //对读感兴趣,边沿触发

        char message[BUF_SIZE];

        int epfd;        //epoll描述符
        clock_t tStart;        //计算程序运行时间

        int client, res, epoll_events_count;

        CHK2(listener, socket(PF_INET, SOCK_STREAM, 0));    //初始化监听socket
        setnonblocking(listener);    //设置监听socket为不阻塞

        // 设置套接字选项避免地址使用错误
        int on=1;
        if((setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)
        {
            perror("setsockopt failed");
            exit(EXIT_FAILURE);
        }

        CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr)));    //绑定监听socket
        CHK(listen(listener, 1));    //设置监听

        CHK2(epfd, epoll_create(EPOLL_SIZE));    //创建一个epoll描述符,并将监听socket加入epoll
        ev.data.fd = listener;
        CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));

        while (1) {
            CHK2(epoll_events_count, epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
            tStart = clock();
            for (int i = 0; i < epoll_events_count; i++) {
                if (events[i].data.fd == listener)    //新的连接到来,将连接添加到epoll中,并发送欢迎消息
                {
                    CHK2(client, accept(listener, (struct sockaddr *)&their_addr, &socklen));
                    setnonblocking(client);
                    ev.data.fd = client;
                    CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));

                    clients_list.push_back(client);    // 添加新的客户端到list
                    bzero(message, BUF_SIZE);
                    res = sprintf(message, STR_WELCOME, client);
                    CHK2(res, send(client, message, BUF_SIZE, 0));

                    printf("client: %d\n", client);

                } else {
                    //CHK2(res, handle_message(events[i].data.fd));    //注意:这里并没有调用epoll_ctl重新设置socket的事件类型,但还是可以继续收到客户端发送过来的信息
                    //printf("-------------client: %d\n", events[i].data.fd);
                    //*
                    pthread_t reader;
                    int arg = events[i].data.fd;
                    int rt = pthread_create(&reader, NULL, handle_message, (void *)&arg);
                    if (-1 == rt) {
                        printf("thread creation error\n");
                        return -1;
                    }
                    //*/
                }
            }
            printf("Statistics: %d events handled at: %.2f second(s)\n", epoll_events_count, (double)(clock() - tStart) / CLOCKS_PER_SEC);

            /*
            Statistics: 1 events handled at: 0.00 second(s)
            ...
            Statistics: 1 events handled at: 0.00 second(s)
            Statistics: 20 events handled at: 0.01 second(s)
            Statistics: 291 events handled at: 0.03 second(s)
            Statistics: 2532 events handled at: 0.23 second(s)
            Statistics: 7156 events handled at: 0.31 second(s)
            //*/
        }

        close(listener);
        close(epfd);
        printf("is it over\n");
        return 0;
    }

    //int handle_message(int client)
    void* handle_message(void *arg)
    {
        int client = *((int*)arg);
        //sleep(5);
        char buf[BUF_SIZE], message[BUF_SIZE];
        bzero(buf, BUF_SIZE);
        bzero(message, BUF_SIZE);

        int len;

        CHK2(len, recv(client, buf, BUF_SIZE, 0));    //接受客户端信息

        if (len == 0)        //客户端关闭或出错,关闭socket,并从list移除socket
        {
            CHK(close(client));
            clients_list.remove(client);
        } else            //向客户端发送信息
        {
            if (clients_list.size() == 1) {
                CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
                //return len;
                return NULL;
            }

            sprintf(message, STR_MESSAGE, client, buf);
            list < int >::iterator it;
            for (it = clients_list.begin(); it != clients_list.end(); it++) {
                if (*it != client) {
                    CHK(send(*it, message, BUF_SIZE, 0));
                }
            }
        }
        //return len;
        return NULL;
    }

tester.cpp文件,模拟服务器的高并发,开启10000个客户端去连接服务器,如下:

    #include "local.h"
    #include "utils.h"
    #include "head.h"

    #define EPOLL_TEST_SIZE 8000

    using namespace std;

    char message[BUF_SIZE];        //接受服务器信息
    list < int >list_of_clients;    //存放所有客户端
    int res;
    clock_t tStart;

    int main(int argc, char *argv[])
    {
        int sock;
        struct sockaddr_in addr;
        addr.sin_family = PF_INET;
        addr.sin_port = htons(SERVER_PORT);
        addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

        tStart = clock();

        for (int i = 0; i < EPOLL_TEST_SIZE; i++)    //生成EPOLL_TEST_SIZE个客户端,这里是10000个,模拟高并发
        {
            usleep(1000);
            CHK2(sock, socket(PF_INET, SOCK_STREAM, 0));
            CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
            list_of_clients.push_back(sock);

            bzero(message, BUF_SIZE);
            CHK2(res, recv(sock, message, BUF_SIZE, 0));
            printf("%s\n", message);
            //CHK2(res, send(sock, message, BUF_SIZE, 0));
        }

        list < int >::iterator it;    //移除所有客户端
        for (it = list_of_clients.begin(); it != list_of_clients.end(); it++)
            close(*it);

        printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart) / CLOCKS_PER_SEC);
        printf("Total server connections was: %d\n", EPOLL_TEST_SIZE);

        return 0;
    }

我就不给出程序的执行结果的截图了,不过下面这张截图是代码作者自己测试的,可以看出,并发10000无压力呀

  单个客户端去连接服务器,client.cpp文件,如下:

    #include "local.h"
    #include "utils.h"
    #include "head.h"

    using namespace std;

    char message[BUF_SIZE];

    /*
        流程:
            调用fork产生两个进程,两个进程通过管道进行通信
            子进程:等待客户输入,并将客户输入的信息通过管道写给父进程
            父进程:接受服务器的信息并显示,将从子进程接受到的信息发送给服务器
    */
    int main(int argc, char *argv[])
    {
        int sock, pid, pipe_fd[2], epfd;

        struct sockaddr_in addr;
        addr.sin_family = PF_INET;
        addr.sin_port = htons(SERVER_PORT);
        addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

        static struct epoll_event ev, events[2];
        ev.events = EPOLLIN | EPOLLET;

        //退出标志
        int continue_to_work = 1;

        CHK2(sock, socket(PF_INET, SOCK_STREAM, 0));
        CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);

        CHK(pipe(pipe_fd));

        CHK2(epfd, epoll_create(EPOLL_SIZE));

        ev.data.fd = sock;
        CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));

        ev.data.fd = pipe_fd[0];
        CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));

        // 调用fork产生两个进程
        CHK2(pid, fork());
        switch (pid) {
        case 0:        // 子进程
            close(pipe_fd[0]);    // 关闭读端
            printf("Enter 'exit' to exit\n");
            while (continue_to_work) {
                bzero(&message, BUF_SIZE);
                fgets(message, BUF_SIZE, stdin);

                // 当收到exit命令时,退出
                if (strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0) {
                    continue_to_work = 0;
                } else {
                    CHK(write(pipe_fd[1], message, strlen(message) - 1));
                }
            }
            break;
        default:        // 父进程
            close(pipe_fd[1]);    // 关闭写端
            int epoll_events_count, res;
            while (continue_to_work) {
                CHK2(epoll_events_count, epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));

                for (int i = 0; i < epoll_events_count; i++) {
                    bzero(&message, BUF_SIZE);
                    if (events[i].data.fd == sock)    //从服务器接受信息
                    {
                        CHK2(res, recv(sock, message, BUF_SIZE, 0));
                        if (res == 0)    //服务器已关闭
                        {
                            CHK(close(sock));
                            continue_to_work = 0;
                        } else {
                            printf("%s\n", message);
                        }
                    } else    //从子进程接受信息
                    {
                        CHK2(res, read(events[i].data.fd, message, BUF_SIZE));
                        if (res == 0) {
                            continue_to_work = 0;
                        } else {
                            CHK(send(sock, message, BUF_SIZE, 0));
                        }
                    }
                }
            }
        }
        if (pid) {
            close(pipe_fd[0]);
            close(sock);
        } else {
            close(pipe_fd[1]);
        }

        return 0;
    }

源代码下载

linux-epoll-server-client-chat-pthread.tar.gz

[[email protected] test]# g++ server.cpp -o server

[[email protected] test]# g++ tester.cpp -o tester

[[email protected] test]# g++ client.cpp -o client

注意:

linux下输入命令ulimit -a,可以看到open files 一般时1024,所以未经修改时做本实验,最大打开到1023号。所以要修改。

修改方法:ulimit -n <可以同时打开的文件数目> 设置用户可以同时打开的最大文件数

如果本参数设置过小,对于并发访问量大的网站,可能会出现too many open files 的错误。

时间: 2024-10-04 00:05:22

(OK) Linux epoll模型—socket epoll server client chat—pthread的相关文章

(OK) Linux epoll模型—socket epoll server client chat

http://www.cnblogs.com/venow/archive/2012/11/30/2790031.html http://blog.csdn.net/denkensk/article/details/41978015 定义: epoll是Linux内核为处理大批句柄而作改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著的减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率.因为它会复用文件描述符集合来传递结果而不是迫使开发者每次等待事

C Socket Programming server client

用C语言socket编程写简单的server和client程序,目的为了测试一次socket连接中,数据从client端开始发送至server端接受完毕所耗的时间,以及找到原因和提升性能.这里贴出代码,make以后,直接运行server和client即可. sever.c /************************************ * For msmr * server_s.c * tesing the speed of socket * 2015-02-04 * [email 

Linux epoll模型

定义: epoll是Linux内核为处理大批句柄而作改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著的减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率.因为它会复用文件描述符集合来传递结果而不是迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一个原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了.epoll除了提供select\poll那种IO事

2020.04.10 线上性能优化以及Linux下NIO/Epoll模型全解--实战

1.支付宝模拟线上优化实战 2.手写JUC工具与提升tomcat吞吐量 3.网络通信BIO设计与缺陷   -- accept()  和 read()阻塞 4.单线程解决高并发NIO精髓解读 5.OS内核下Epoll与Selete源码解读 第一部分: 性能优化 问题:如何在高并发场景下实现支付宝用户登录页面的信息获取?如用户信息,金额,积分等 浏览器  ---- Spring MVC ---- controller ----- service ---- 用户模块.余额模块.积分模块等 -- 调用多

Linux Epoll模型(1) --理论与实践

引言: 相比于select,Epoll最大的好处在于它不会随着监听fd数目的增长而降低效率.因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多.并且,在linux/posix_types.h头文件有这样的声明: #define __FD_SETSIZE    1024 表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎治标不治本. 常用模型的特点 Linux 下设计并发网络程序,有典型的Apache模型(

Linux高并发机制——epoll模型

epoll是一个特别重要的概念,常常用于处理服务端的并发问题.当服务端的在线人数越来越多,会导致系统资源吃紧,I/O效率越来越慢,这时候就应该考虑epoll了.epoll是Linux内核为处理大批句柄而作改进的poll,是Linux特有的I/O函数.其特点如下: 1.epoll是Linux下多路复用IO接口select/poll的增强版本.其实现和使用方式与select/poll有很多不同,epoll通过一组函数来完成有关任务,而不是一个函数. 2.epoll之所以高效,是因为epoll将用户关

基本I/O模型与Epoll简介

5种基本的I/O模型:1)阻塞I/O ;2)非阻塞I/O; 3)I/O复用(select和poll);4)信号驱动I/O(SIGIO);5)异步I/O(POSIX.1的aio_系列函数). 操作系统中一个输入操作一般有两个不同的阶段: 第一:等待数据准备好.第二:从内核到进程拷贝数据.对于一个sockt上的输入操作,第一步一般是等待数据到达网络,当分组到达时,它被拷贝到内核中的某个缓冲区,第二步是将数据从内核缓冲区拷贝到应用程序缓冲区. 一.           阻塞I/O模型 请求无法立即完成

epoll简介 与 UDP server的实现

Abstractepoll是Linux内核为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率. 简介:epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为: 它会复用文件描述符集合来传递结果, 而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因:

epoll实现socket通信

epoll是Linux特有的I/O复用函数,它在实现和使用上与select和poll有很大差异.epoll使用一组函数来完成任务,而不是单个函数.epoll把用户关心的文件描述符上的事件放在内核的一个事件表中,无需像select和poll那样每次调用都要重复传入文件描述符集或事件集,但epoll需要一个额外的文件描述符,来唯一标示内核中的这个事件表,这个文件描述符使用epoll_create函数来创建. epoll是一种高效的管理socket的模型,相对于select和poll来说具有更高的效率