在做网络服务的时候tcp并发服务端程序的编写必不可少。tcp并发通常有几种固定的设计模式套路,他们各有优点,也各有应用之处。下面就简单的讨论下这几种模式的差异:
单进程,单线程
在accept之后,就开始在这一个连接连接上的数据收接收,收到之后处理,发送,不再接收新的连接,除非这个连接的处理结束。
优点: 简单。
缺点: 因为只为一个客户端服务,所以不存在并发的可能。
应用: 用在只为一个客户端服务的时候。
多进程
accept返回成功时候,就为这一个连接fork一个进程,专门处理这个连接上的数据收发,等这个连接处理结束之后就结束这个进程。
优点: 编程相对简单,不用考虑线程间的数据同步等。
缺点: 资源消耗大。启动一个进程消耗相对比启动一个线程要消耗大很多,同时在处理很多的连接时候需要启动很多的进程多去处理,这时候对系统来说压力就会比较大。另外系统的进程数限制也需要考虑。
应用: 在客户端数据不多的时候使用很方便,比如小于10个客户端。
多线程
类似多进程方式,但是针对一个连接启动一个线程。
优点: 相对多进程方式,会节约一些资源,会更加高效一些。
缺点: 相对多进程方式,增加了编程的复杂度,因为需要考虑数据同步和锁保护。另外一个进程中不能启动太多的线程。在Linux系统下线程在系统内部其实就是进程,线程调度按照进程调度的方式去执行的。
应用: 类似于多进程方式,适用于少量的客户端的时候。
Select+多线程
有一个线程专门用于监听端口,accept返回之后就把这个描述符放入 描述符集合 fd中,一个线程用select去轮训描述符集合,在有数据的连接上接收数据,另外一个线程专门发送数据。当然也可以接收和发送用一个线程。描述符可以设置成非阻塞模式,也可以设置成阻塞模式。通常连接设置成非阻塞模式,发送线程独立出来。
优点:相对前几种模式,这种模式大大提高了并发量。
缺点:系统一般实现描述符集合是采用一个大数组,每次调用select的时候都会轮询这个描述符数组,当连接数很多的时候就会导致效率下降。连接数在1000以上时候效率会下降到不能接受。
应用:目前windows 和一般的Unix上的tcp并发都采用select方式,应该说应用还是很广泛的。
epoll方式
在Linux2.6版本之后,增加了epoll。具体的使用是:一个线程专门进行端口监听,accept接收到连接的时候,把该连接设置成非阻塞模式,把 epoll事件设置成边缘触发方式,加入到epoll管理。接收线程阻塞在epoll的等待事件函数。另外一个线程专门用于数据发送。
优点:由于epoll的实现方式先进,所以这种方式可以大规模的实现并发。我们现在的应用在一个3年前的dell的pc server可以实现2万个连接的并发,性能也是很好的。
缺点:由于涉及了线程和非阻塞,所以会导致编码的复杂度增大一些。这种方式只适用于Linux 2.6内核以后。
注意:
1) 如果把epoll事件设置成水平触发效率就下降到类似采用select的水平。
2) Unix系统下有单个进程打开的描述符数目限制,还有系统内打开的描述符数目限制。系统内打开的描述符数目限制由软硬限制两个。硬限制是根据机器的配置而不同。软限制可以更改,但是必须小于系统的硬限制。在suse Linux下,可以在root用户下,通过ulimit -n 数目去修改这个限制。
应用: Linux下大规模的tcp并发。
配置开发支持高并发TCP连接的Linux应用程序全攻略
修改用户进程可打开文件数限制
在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄)。可使用ulimit命令查看系统允许当前用户进程打开的文件数限制:
[[email protected] ~]$ ulimit -n
1024
这表示当前用户的每个进程最多允许同时打开1024个文件,这1024个文件中还得除去每个进程必然打开的标准输入,标准输出,标准错误,服务器监听 socket,进程间通讯的unix域socket等文件,那么剩下的可用于客户端socket连接的文件数就只有大概1024-10=1014个左右。也就是说缺省情况下,基于Linux的通讯程序最多允许同时1014个TCP并发连接。
对于想支持更高数量的TCP并发连接的通讯处理程序,就必须修改Linux对当前用户的进程同时打开的文件数量的软限制(soft limit)和硬限制(hardlimit)。其中软限制是指Linux在当前系统能够承受的范围内进一步限制用户同时打开的文件数;硬限制则是根据系统硬件资源状况(主要是系统内存)计算出来的系统最多可同时打开的文件数量。通常软限制小于或等于硬限制。
修改上述限制的最简单的办法就是使用ulimit命令:
[[email protected] ~]$ ulimit -n <file_num>
上述命令中,在<file_num>中指定要设置的单一进程允许打开的最大文件数。如果系统回显类似于“Operation notpermitted”之类的话,说明上述限制修改失败,实际上是因为在<file_num>中指定的数值超过了Linux系统对该用户打开文件数的软限制或硬限制。因此,就需要修改Linux系统对用户的关于打开文件数的软限制和硬限制。
第一步,修改/etc/security/limits.conf文件,在文件中添加如下行:
speng soft nofile 10240
speng hard nofile 10240
其中speng指定了要修改哪个用户的打开文件数限制,可用‘*‘号表示修改所有用户的限制;soft或hard指定要修改软限制还是硬限制;10240则指定了想要修改的新的限制值,即最大打开文件数(请注意软限制值要小于或等于硬限制)。修改完后保存文件。
第二步,修改/etc/pam.d/login文件,在文件中添加如下行:
session required /lib/security/pam_limits.so
这是告诉Linux在用户完成系统登录后,应该调用pam_limits.so模块来设置系统对该用户可使用的各种资源数量的最大限制(包括用户可打开的最大文件数限制),而pam_limits.so模块就会从/etc/security/limits.conf文件中读取配置来设置这些限制值。修改完后保存此文件。
第三步,查看Linux系统级的最大打开文件数限制,使用如下命令:
[[email protected] ~]$ cat /proc/sys/fs/file-max
12158
这表明这台Linux系统最多允许同时打开(即包含所有用户打开文件数总和)12158个文件,是Linux系统级硬限制,所有用户级的打开文件数限制都不应超过这个数值。通常这个系统级硬限制是Linux系统在启动时根据系统硬件资源状况计算出来的最佳的最大同时打开文件数限制,如果没有特殊需要,不应该修改此限制,除非想为用户级打开文件数限制设置超过此限制的值。修改此硬限制的方法是修改/etc/rc.local脚本,在脚本中添加如下行:
echo 22158 > /proc/sys/fs/file-max
这是让Linux在启动完成后强行将系统级打开文件数硬限制设置为22158。修改完后保存此文件。
完成上述步骤后重启系统,一般情况下就可以将Linux系统对指定用户的单一进程允许同时打开的最大文件数限制设为指定的数值。如果重启后用 ulimit-n命令查看用户可打开文件数限制仍然低于上述步骤中设置的最大值,这可能是因为在用户登录脚本/etc/profile中使用ulimit -n命令已经将用户可同时打开的文件数做了限制。由于通过ulimit-n修改系统对用户可同时打开文件的最大数限制时,新修改的值只能小于或等于上次 ulimit-n设置的值,因此想用此命令增大这个限制值是不可能的。所以,如果有上述问题存在,就只能去打开/etc/profile脚本文件,在文件中查找是否使用了ulimit-n限制了用户可同时打开的最大文件数量,如果找到,则删除这行命令,或者将其设置的值改为合适的值,然后保存文件,用户退出并重新登录系统即可。
通过上述步骤,就为支持高并发TCP连接处理的通讯处理程序解除关于打开文件数量方面的系统限制。
修改网络内核对TCP连接的有关限制
在Linux上编写支持高并发TCP连接的客户端通讯处理程序时,有时会发现尽管已经解除了系统对用户同时打开文件数的限制,但仍会出现并发TCP连接数增加到一定数量时,再也无法成功建立新的TCP连接的现象。出现这种现在的原因有多种。
第一种原因可能是因为Linux网络内核对本地端口号范围有限制。此时,进一步分析为什么无法建立TCP连接,会发现问题出在connect()调用返回失败,查看系统错误提示消息是“Can‘t assign requestedaddress”。同时,如果在此时用tcpdump工具监视网络,会发现根本没有TCP连接时客户端发SYN包的网络流量。这些情况说明问题在于本地Linux系统内核中有限制。其实,问题的根本原因在于Linux内核的TCP/IP协议实现模块对系统中所有的客户端TCP连接对应的本地端口号的范围进行了限制(例如,内核限制本地端口号的范围为1024~32768之间)。当系统中某一时刻同时存在太多的TCP客户端连接时,由于每个TCP客户端连接都要占用一个唯一的本地端口号(此端口号在系统的本地端口号范围限制中),如果现有的TCP客户端连接已将所有的本地端口号占满,则此时就无法为新的TCP客户端连接分配一个本地端口号了,因此系统会在这种情况下在connect()调用中返回失败,并将错误提示消息设为“Can‘t assignrequested address”。有关这些控制逻辑可以查看Linux内核源代码,以linux2.6内核为例,可以查看tcp_ipv4.c文件中如下函数:
static int tcp_v4_hash_connect(struct sock *sk)
请注意上述函数中对变量sysctl_local_port_range的访问控制。变量sysctl_local_port_range的初始化则是在tcp.c文件中的如下函数中设置:
void __init tcp_init(void)
内核编译时默认设置的本地端口号范围可能太小,因此需要修改此本地端口范围限制。
第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:
net.ipv4.ip_local_port_range = 1024 65000
这表明将系统对本地端口范围限制设置为1024~65000之间。请注意,本地端口范围的最小值必须大于或等于1024;而端口范围的最大值则应小于或等于65535。修改完后保存此文件。
第二步,执行sysctl命令:
[[email protected] ~]$ sysctl -p
如果系统没有错误提示,就表明新的本地端口范围设置成功。如果按上述端口范围进行设置,则理论上单独一个进程最多可以同时建立60000多个TCP客户端连接。
第二种无法建立TCP连接的原因可能是因为Linux网络内核的IP_TABLE防火墙对最大跟踪的TCP连接数有限制。此时程序会表现为在 connect()调用中阻塞,如同死机,如果用tcpdump工具监视网络,也会发现根本没有TCP连接时客户端发SYN包的网络流量。由于 IP_TABLE防火墙在内核中会对每个TCP连接的状态进行跟踪,跟踪信息将会放在位于内核内存中的conntrackdatabase中,这个数据库的大小有限,当系统中存在过多的TCP连接时,数据库容量不足,IP_TABLE无法为新的TCP连接建立跟踪信息,于是表现为在connect()调用中阻塞。此时就必须修改内核对最大跟踪的TCP连接数的限制,方法同修改内核对本地端口号范围的限制是类似的:
第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:
net.ipv4.ip_conntrack_max = 10240
这表明将系统对最大跟踪的TCP连接数限制设置为10240。请注意,此限制值要尽量小,以节省对内核内存的占用。
第二步,执行sysctl命令:
[[email protected] ~]$ sysctl -p
如果系统没有错误提示,就表明系统对新的最大跟踪的TCP连接数限制修改成功。如果按上述参数进行设置,则理论上单独一个进程最多可以同时建立10000多个TCP客户端连接。
使用支持高并发网络I/O的编程技术
在Linux上编写高并发TCP连接应用程序时,必须使用合适的网络I/O技术和I/O事件分派机制。
可用的I/O技术有同步I/O,非阻塞式同步I/O(也称反应式I/O),以及异步I/O。在高TCP并发的情形下,如果使用同步I/O,这会严重阻塞程序的运转,除非为每个TCP连接的I/O创建一个线程。但是,过多的线程又会因系统对线程的调度造成巨大开销。因此,在高TCP并发的情形下使用同步 I/O是不可取的,这时可以考虑使用非阻塞式同步I/O或异步I/O。非阻塞式同步I/O的技术包括使用select(),poll(),epoll等机制。异步I/O的技术就是使用AIO。
从I/O事件分派机制来看,使用select()是不合适的,因为它所支持的并发连接数有限(通常在1024个以内)。如果考虑性能,poll()也是不合适的,尽管它可以支持的较高的TCP并发数,但是由于其采用“轮询”机制,当并发数较高时,其运行效率相当低,并可能存在I/O事件分派不均,导致部分 TCP连接上的I/O出现“饥饿”现象。而如果使用epoll或AIO,则没有上述问题(早期Linux内核的AIO技术实现是通过在内核中为每个 I/O请求创建一个线程来实现的,这种实现机制在高并发TCP连接的情形下使用其实也有严重的性能问题。但在最新的Linux内核中,AIO的实现已经得到改进)。
综上所述,在开发支持高并发TCP连接的Linux应用程序时,应尽量使用epoll或AIO技术来实现并发的TCP连接上的I/O控制,这将为提升程序对高并发TCP连接的支持提供有效的I/O保证。
网络编程中并发服务器的设计模式
并发服务器有三种设计模式
1) 多进程。每个进程服务一个客户端。优势是有各自独立的地址空间,可靠性高,但进程调度开销大,无法资源共享,进程间通信机制复杂。
2) 多线程。每个线程服务一个客户端。优势是开销小,通信机制简单,可共享内存。但共享地址空间,可靠性低,一个服务器出现问题时可能导致系统崩溃,同时全局共享可能带来竞争,共享资源需要互斥,对编程要求高。
3) 单进程:占有的进程及线程资源少,通信机制简单。但监听服务器及各个子服务器揉和在一起,程序结构复杂不清晰,编程麻烦。
例子:多进程并发服务器
/* 多进程并发服务器。该程序等候客户连接,一旦连接则显示客户的地址,
接着接收该客户的名字并显示。然后接收来自该客户的信息(字符串)。
每当收到一个字符串,则显示该字符串,并将字符串反转,再将反转的字
符发回客户。之后,继续等待接收该客户的信息直至该客户关闭连接。服
务器具有同时处理多客户的能力。
*/
#include <stdio.h> /* These are the usual header files */
#include <strings.h> /* for bzero() */
#include <unistd.h> /* for close() */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define PORT 1234 /* Port that will be opened */
#define BACKLOG 2 /* Number of allowed connections */
#define MAXDATASIZE 1000
void process_cli(int connectfd, struct sockaddr_in client);
main()
{
int listenfd, connectfd; /* socket descriptors */
pid_t pid;
struct sockaddr_in server; /* server‘s address information */
struct sockaddr_in client; /* client‘s address information */
int sin_size;
/* Create TCP socket */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() error\n");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while(1)
{
/*accept connection.what causes the acceptance? */
if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {
perror("accept() error\n");
exit(1);
}
/* Create child process to service client */
if ((pid=fork())>0) {
/* parent process */
close(connectfd);
continue;
}
else if (pid==0) {
/*child process*/
close(listenfd);
process_cli(connectfd, client);
exit(0);
}
else {
printf("fork error\n");
exit(0);
}
}
close(listenfd); /* close listenfd */
}
void process_cli(int connectfd, struct sockaddr_in client)
{
int num;
char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];
printf("You got a connection from %s. ",inet_ntoa(client.sin_addr) ); /* prints client‘s IP */
/* Get client‘s name from client */
num = recv(connectfd, cli_name, MAXDATASIZE,0);
if (num == 0) {
close(connectfd);
printf("Client disconnected.\n");
return;
}
cli_name[num - 1] = ‘\0‘;
printf("Client‘s name is %s.\n",cli_name);
while (num = recv(connectfd, recvbuf, MAXDATASIZE,0))
{
int i = 0;
recvbuf[num] = ‘\0‘;
printf("Received client( %s ) message: %s",cli_name, recvbuf);
for (i = 0; i < num - 1; i++) {
sendbuf[i] = recvbuf[num - i -2];
}
sendbuf[num - 1] = ‘\0‘;
send(connectfd,sendbuf,strlen(sendbuf),0); /* send to the client welcome message */
}
close(connectfd); /* close connectfd */
}
例子:多线程并发服务器
#include <stdio.h> /* These are the usual header files */
#include <string.h> /* for bzero() */
#include <unistd.h> /* for close() */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <stdlib.h> /* for exit in c++(.C/.cc) ; no need for c ??*/
#define PORT 1234 /* Port that will be opened */
#define BACKLOG 5 /* Number of allowed connections */
#define MAXDATASIZE 1000
//void process_cli(int connectfd, sockaddr_in client); // c only supports struct sockaddr_in, but c++ support sockaddr_in
void process_cli(int connectfd, struct sockaddr_in client);
/* function to be executed by the new thread */
void* start_routine(void* arg);
typedef struct _ARG {
int connfd;
struct sockaddr_in client;
}ARG;
// it‘s better to use typedef struct
main()
{
int listenfd, connectfd; /* socket descriptors */
pthread_t thread;
//struct ARG *arg; // when no typedef,there should be struct for c code; no need for c++
ARG *arg;
struct sockaddr_in server; /* server‘s address information */
struct sockaddr_in client; /* client‘s address information */
int sin_size;
/* Create TCP socket */
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1) {
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() error\n");
exit(1);
}
sin_size=sizeof(struct sockaddr_in);
while(1)
{
/* Accept connection */
// if ((connectfd = accept(listenfd,(struct sockaddr *)&client,&sin_size))==-1) {// no problem for c
if ((connectfd = accept(listenfd,(struct sockaddr *)&client,(socklen_t *)&sin_size))==-1) {
perror("accept() error\n");
exit(1);
}
/* Create thread*/
arg = new ARG;
arg->connfd = connectfd;
//memcpy((void *)&arg->client, &client, sizeof(client)); // both ok!
memcpy(&arg->client, &client, sizeof(client));
if (pthread_create(&thread, NULL, start_routine, (void*)arg)) {
/* handle exception */
perror("Pthread_create() error");
exit(1);
}
}
close(listenfd); /* close listenfd */
}
void process_cli(int connectfd, sockaddr_in client)
{
int num;
char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];
printf("You got a connection from %s. ",inet_ntoa(client.sin_addr) );
/* Get client‘s name from client */
num = recv(connectfd, cli_name, MAXDATASIZE,0);
if (num == 0) {
close(connectfd);
printf("Client disconnected.\n");
return;
}
cli_name[num - 1] = ‘\0‘;
printf("Client‘s name is %s.\n",cli_name);
while (num = recv(connectfd, recvbuf, MAXDATASIZE,0)) {
recvbuf[num] = ‘\0‘;
printf("Received client( %s ) message: %s",cli_name, recvbuf);
for (int i = 0; i < num - 1; i++) {
sendbuf[i] = recvbuf[num - i -2];
}
sendbuf[num - 1] = ‘\0‘;
send(connectfd,sendbuf,strlen(sendbuf),0);
}
close(connectfd); /* close connectfd */
}
void* start_routine(void* arg)
{
ARG *info;
info = (ARG *)arg;
/* handle client‘s requirement */
process_cli(info->connfd, info->client);
////delete arg will cause warning!the type for deleting should be the same as new allocated
delete info;
pthread_exit(NULL);
}
例子:单线程并发服务器
#include <stdio.h> /* These are the usual header files */
#include <string.h> /* for bzero() */
#include <unistd.h> /* for close() */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <stdlib.h>
#define PORT 6888 /* Port that will be opened */
#define BACKLOG 5 /* Number of allowed connections simutaniously*/
#define MAXDATASIZE 1000
typedef struct _CLIENT{
int fd;
struct sockaddr_in addr; /* client‘s address information */
char data[1024];
}CLIENT;
void process_cli(CLIENT *client, char* recvbuf, int len);
void savedata(char* recvbuf, int len, char* data);
main()
{
int i, maxi, maxfd, sockfd;
int nready;
size_t n;
fd_set rset, allset;
int listenfd, connectfd; /* socket descriptors */
struct sockaddr_in server; /* server‘s address information */
/* client‘s information */
CLIENT client[FD_SETSIZE];
char recvbuf[MAXDATASIZE];
int sin_size;
/* Create TCP socket */
if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
/* handle exception */
perror("Creating socket failed.");
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(PORT);
server.sin_addr.s_addr = htonl (INADDR_ANY);
if( bind(listenfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) == -1 ){
/* handle exception */
perror("Bind error.");
exit(1);
}
if(listen(listenfd,BACKLOG) == -1){ /* calls listen() */
perror("listen() error\n");
exit(1);
}
sin_size = sizeof(struct sockaddr_in);
/*initialize for select */
maxfd = listenfd;
maxi = -1;
for( i = 0; i<FD_SETSIZE; i++ ){
client[i].fd = -1;
}
FD_ZERO( &allset );
FD_SET( listenfd, &allset );
while(1){
struct sockaddr_in addr;
rset = allset;
nready = select( maxfd+1, &rset, NULL, NULL, NULL );
printf("select saw rset actions and the readfset num is %d. \n",nready );
if( FD_ISSET(listenfd, &rset) ){
/* new client connection */
/* Accept connection */
printf("accept a connection.\n");
if(( connectfd = accept(listenfd,(struct sockaddr *)&addr,(socklen_t *)&sin_size))==-1 ){
perror("accept() error\n");
continue;
}
/* Put new fd to client */
for( i = 0; i < FD_SETSIZE; i++ ){
if (client[i].fd < 0) {
client[i].fd = connectfd; /* save descriptor */
client[i].addr = addr;
client[i].data[0] = ‘\0‘;
printf("You got a connection from %s. ",inet_ntoa(client[i].addr.sin_addr) );
break;
}
}
printf("add new connect fd.\n");
if(i == FD_SETSIZE ) printf("too many clients\n");
FD_SET( connectfd, &allset ); /* add new descriptor to set */
if( connectfd > maxfd ) maxfd = connectfd;
if( i > maxi ) maxi = i;
if( --nready <= 0 ) continue; /* no more readable descriptors */
}
for( i = 0; i <= maxi; i++ ){
/* check all clients for data */
if( (sockfd = client[i].fd) < 0 ) continue; /* no more connected clients*/
if( FD_ISSET(sockfd, &rset) ){
printf( "recv occured for connect fd[%d].\n",i );
if( (n = recv(sockfd, recvbuf, MAXDATASIZE,0) ) == 0 ){
/*connection closed by client */
close(sockfd);
printf("Client( %d ) closed connection.\n",client[i].fd );
FD_CLR(sockfd, &allset);
client[i].fd = -1;
}
else{
process_cli( &client[i], recvbuf, n );
}
if (--nready <= 0) break; /* no more readable descriptors */
}
}
}
close(listenfd); /* close listenfd */
}
void process_cli( CLIENT *client, char* recvbuf, int len )
{
send( client->fd, recvbuf, len, 0 );
}
例子:epoll使用
#include <unistd.h>
#include <sys/types.h> /* basic system data types */
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */
#include <sys/epoll.h> /* epoll function */
#include <fcntl.h> /* nonblocking */
#include <sys/resource.h> /*setrlimit */
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#define MAXEPOLLSIZE 10000
#define MAXLINE 10240
int handle(int connfd);
int setnonblocking( int sockfd )
{
if( fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) {
return -1;
}
return 0;
}
int main(int argc, char **argv)
{
int servPort = 6888;
int listenq = 1024;
int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0;
struct sockaddr_in servaddr, cliaddr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];
struct rlimit rt;
char buf[MAXLINE];
/*设置每个进程允许打开的最大文件数
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
if( setrlimit( RLIMIT_NOFILE, &rt ) == -1 ){
perror("setrlimit error");
return -1;
} */
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(servPort);
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("can‘t create socket file");
return -1;
}
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if( setnonblocking(listenfd) < 0 ){
perror("setnonblock error");
}
if( bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1 ){
perror("bind error");
return -1;
}
if( listen(listenfd, listenq) == -1 ){
perror("listen error");
return -1;
}
/*创建epoll句柄,把监听 socket 加入到 epoll 集合里 */
kdpfd = epoll_create( MAXEPOLLSIZE );
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listenfd;
if( epoll_ctl( kdpfd, EPOLL_CTL_ADD, listenfd, &ev ) < 0){
fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd );
return -1;
}
curfds = 1;
printf( "epollserver startup, port %d, max connection is %d, backlog is %d\n", servPort, MAXEPOLLSIZE, listenq );
for (;;) {
/* 等待有事件发生 */
nfds = epoll_wait( kdpfd, events, curfds, -1 );
if (nfds == -1){
perror("epoll_wait");
continue;
}
printf( "events happen %d\n", nfds );
/* 处理所有事件 */
for( n = 0; n < nfds; ++n ){
if( events[n].data.fd == listenfd ){
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &socklen );
if (connfd < 0){
perror("accept error");
continue;
}
sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
printf("%d:%s", ++acceptCount, buf);
if( curfds >= MAXEPOLLSIZE ){
fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE);
close(connfd);
continue;
}
if( setnonblocking(connfd) < 0 ){
perror("setnonblocking error");
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
if( epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev ) < 0 ){
fprintf(stderr, "add socket ‘%d‘ to epoll failed: %s\n", connfd, strerror(errno));
return -1;
}
curfds++;
continue;
}
// 处理客户端请求
if( handle( events[n].data.fd ) < 0 ){
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev );
curfds--;
}
}
}
close( listenfd );
return 0;
}
int handle( int connfd ) {
int nread;
char buf[MAXLINE];
nread = read(connfd, buf, MAXLINE); //读取客户端socket流
if( nread == 0 ){
printf("client close the connection\n");
close(connfd);
return -1;
}
if( nread < 0 ){
perror("read error");
close(connfd);
return -1;
}
write( connfd, buf, nread ); //响应客户端
return 0;
}