一、I/O复用应用场合
1. 当客户处理多个描述符(既有标准输入,又有网络套接字)时,必须使用IO复用。
2. 一个客户同时处理多个套接字是可能的。
3. 如果一个服务器既要处理监听套接字,又要处理已连接套接字,一般就要使用I/O复用。
4. 如果一个服务器既要处理TCP,又要处理UDP,一般就要I/O复用。
5. 如果一个服务器要处理多个服务或协议,就要用到I/O复用。
其实IO复用就是一个进程/线程处理多个套接字描述符。
二、 I/O模型
Unix提供了5种I/O模型:
1. 阻塞式I/O模型
假如套接字是阻塞的(一般默认的都是阻塞的),其阻塞函数有accept,read,recvfrom等,我们看看recvfrom:
如图所示,应用进程调用recvfrom,则系统就会阻塞进程,切换到内核等待数据的到来。
这里插入:我们说套接字的读入操作,第一步是等待的数据从网络中到达,被复制到内核的缓冲区中。第二步就是内核把数据从缓冲区复制到应用进程的缓冲区。
一般的recvfrom在第一步会被唤醒,然后调用recvfrom执行第二步。
2. 非阻塞I/O模型
假如我们把套接字设为非阻塞的,则recvfrom不会阻塞进程,如果没有数据,则recvfrom会返回错误。所以需要我们写一个循环来一直测试其有无数据到达。
3. I/O复用模型
I/O复用模型,假如select上绑定了多个描述符。进程会阻塞在select上,如果select某个套接字变为可读,则select就会返回。然后进程调用那个套接字的recvfrom来读取数据。
所以I/O复用有了两个系统调用,对于单个描述符的程序,I/O复用会稍显劣势。
4. 信号驱动式I/O模型
我们可以使用信号,让内核在描述符就绪时发送SIGIO信号通知我们。
首先使用sigaction设定SIGIO信号处理函数中包含recvfrom。然后进行继续执行其他代码,当某个时刻数据准备好之后,内核递交SIGIO,则进行就会捕获信号,转而信号处理函数,调用recvfrom。
5. 异步I/O模型
工作机制,我们把描述符,缓冲区一并告诉内核,然后进程不阻塞,继续执行其他代码,而当有数据来临时,内核进行对该描述符读取到缓冲区buff中,完成后发送信号,告知进程读取完成。
信号I/O模型和异步I/O模型的主要区别在于:
信号I/O模型:内核发送信号告知我们,此时应该要启动一个I/O操作了。
异步I/O模型:内核做完I/O操作后,告知我们何时完成了。
但是目前支持异步I/O模型的系统还是较为罕见的。
三、select函数
#include<sys/select.h> #include<sys/time.h> int select (intmaxfdp1, fd_set* readset, fd_set* writeset, fd_set* exceptset, const struct timeval*timeout);
该函数告知内核等待多个事件中的任何一个发生。函数阻塞到任何一个事件发生就返回。
或readset中的描述符其中一个变为可读。
或writeset中的描述符其中一个变为可写。
或exceptset中的描述符其中一个有异常条件等待处理。
或select等待的时间超过timeout的时间,返回超时。
1. 我们先看看最后一个参数timeout
struct timeval { longtv_sec;//seconds 秒 longtv_usec;//microseconds 微秒 };
该参数有三种可能:
(1) 永远等下去,此时该参数为NULL。
(2) 等待一个固有时间,参数为指向一个timaval的结构体的指针
(3) 不等待,立即返回。该参数指针一个timaval结构体的指针,该结构体中的两个值必须都为0.
注意:此时timeout为const,所以select不可修改。如果想获得select经历了多长时间返回,则需要在select取得系统时间,在select返回时获取系统时间,相减。
2. exceptset是对描述符的检测。
目前描述符有异常条件等待处理的触发条件只有:
某个套接字的带外数据到达。以后再详细说这个情况。
3. readset,writeset,exceptset都是对描述符的检测集合。Unix是如何设置这些集合的?
首先描述符一般都是一个小整数。
maxfdp1参数指定select函数检测的最大描述符的值加1.
readset,writeset,exceptset都是包含描述符的三个数组,并且这些数组中最大的描述符的值为maxfdp1-1。
然后如何操作这个描述符的数组?
系统提供了几个宏:
void FD_ZERO(fd_set* fdset);//清空fdset数组内的所有描述符。 void FD_SET(int fd,fd_set* fdset);//往fdset数组中加入fd描述符。 void FD_CLR(int fd,fd_set* fdset);//从fdset数组中删除fd描述符。 int FD_ISSET(int fd, fd_set* fdset);//判断fd描述符是否在fdset数组内。
例如:
fd_setfdset; FD_ZERO(&fdset); FD_SET(2,&fdset); FD_SET(4,&fdset); select(5,&fdset,NULL,NULL,NULL);
4. 如何从select函数返回时,得到哪些描述符已就绪?
由于readset,writeset,exceptset传入的都是地址,所以select会修改这三个参数。select把三个数组中已就绪的描述符保留,把没就绪的描述符删除。
所以我们循环检查之前设置的描述符是否还在数组中即可。使用FD_ISSET宏来实现。
而每次重新调用select时,我们需要重新把关心的描述符均重新设置为1。
其实:select的机制可以看成,select检测0~maxfdp1-1所有描述符,初始化为0,而fdset就是把其中用户关心的描述符设为1,select就会监听这些为1的描述符,
然后select函数会把就绪的描述符保留为1,其他的没就绪的描述符设为0。
5. 如果readset,writeset,exceptset三个参数都是NULL,则select就可以看成一个更为精确的定时器了。因为sleep的最小单位是秒。
6. select返回已就绪描述符的总数,超时返回0,错误返回-1(被信号中断).
7. 描述符就绪条件。
首先我们明白一点:我们给select的描述符都是阻塞在某个函数的,我们为了同时监听这些阻塞函数何时被唤醒,才有了I/O复用,才有了select。
所以说描述符就绪的条件,就是描述符阻塞的函数被唤醒,即变为不阻塞了。这就是就绪的条件。
一个描述符变为可读的:
(1) 如果是一个已连接套接字描述符
--套接字接收缓冲区内数据字节数大于等于该套接字接收缓冲区低水位标记时,该描述符是可读的。read返回大于0的值。一般TCP的低水位标记默认为1。
--套接字接收FIN,read返回0.
--套接字错误,read返回-1.
也就是说当read不阻塞的时候,该套接字描述符就变为可读的。
(2) 如果是一个监听套接字描述符
--当accept接收大于0个连接时,该描述符变为可读。
也就是说accept不阻塞的。
一个描述符变为可写的:
(1) 我们知道当套接字发送缓冲区过小,而write函数发送缓冲区比较大时,write会被阻塞。所以引起了这个条件:
当套接字发送缓冲区可用字节数大于等于套接字发送缓冲区低水位标记时,write返回一个正值。一般TCP、UDP的低水位标记为2048。
也就是说TCP规定,当套接字发送缓冲区可用字节数大于2048字节时,此时该套接字是不可写的,即调用write会被阻塞。
(2) 对已接收到RST的套接字write,会发生SIGPIPE信号.
(3) 使用非阻塞式套接字connect成功,或connect以失败告终。
一个描述符变为异常待处理
套接字的带外数据到达。以后细说。
注意:当某个套接字发生错误,如read返回-1.write返回-1,等它将由select标记为既可读又可写。
四、至此我们修订我们以前的回射客户端程序
这里我们只需要修订我们的str_cli函数
void str_cli(int sockfd, FILE* fd) { char recvmsg[MAXLINE],sendmsg[MAXLINE]; int n; fd_set readfs,allset; FD_ZERO(&allset); FD_SET(sockfd, &allset); FD_SET(fileno(fd), &allset);//fineno函数将标准输入输出文件指针转换为文件描述符 intmaxfdp1=max(fileno(fd),sockfd)+1; while(true) { readfs=allset; if(select(maxfdp1,&readfs,NULL,NULL,NULL)>0) { if(FD_ISSET(sockfd,&readfs))//套接字可读 { if(read(sockfd,recvmsg, MAXLINE)==0)//服务器端已关闭,发送FIN分组。 err_sys("server terminated!"); fputs(recvmsg, stdout); } if(FD_ISSET(fileno(fd),&readfs))//注意这里不能else if,因为可能两个描述符同时可读 { if(fgets(sendmsg,MAXLINE,fd)!=NULL) write(sockfd,sendmsg,strlen(sendmsg)); else return;//程序终止 } } else err_sys("selecterror"); } }
这里使用select解决了前面提到的,当服务器进程崩溃,客户端能够立即知道。
但是这里还是有问题的:
假如我们批量的输入一堆数据(或从某个文件中输入一堆数据),则fgets读到EOF时,str_cli函数就会结束,进程终止,调用close函数来关闭套接字。注意这时可能有数据还正在去往服务器的路上,还可能有数据正在回射往客户端的路上,所以可能造成客户端接收的数据小于输入发送的数据。
因为这时我们已经结束str_cli函数了,所以客户端不会再去read数据了。
EOF只是代表客户端输入结束,并不代表接收数据也结束了。
解决的办法:使用shutdown函数来代替close函数。
1. shutdown函数和close函数的区别
(1) close函数把描述符的引用计数减一,仅在该计数变为0时才关闭套接字。而shutdown函数可以不管引用计数就激发TCP的正常终止序列。
(2) close终止读和写两个方向的数据。即虽然close关闭也存在半开关闭,但我们不能对已经调用close函数的描述符再进行read/write。而shutdown函数可以选择关闭读或者写。
2. shutdown函数
int shutdown(int sockfd, int howto);//成功返回0,出错返回-1
该函数的行为依赖于howto参数的值:
SHUT_RD:关闭读这一半,调用shutdown后,该套接字不可再进行读操作,且套接字接收缓冲区的内容被丢弃,如有来自对端的数据发送给该套接字,则都被确认,然后悄然丢弃。
注意:此时是可以对该套接字进行写操作的。
SHUT_WR:关闭写这一半,调用shutdown后,该套接字不可再进行读操作,且套接字发送缓冲区的内容被丢弃。不管引用技术,关闭都会执行。
注意:此时是可以对该套接字进行读操作的。
SHUT_RDWR:关闭读和写。这和调用close除引用次数规则外,是等效的。
3. 上述的问题的解决办法就是调用shutdown函数,关闭写这一半。注意无论如何调用shutdown函数,其都会给对端发送FIN分组。
可以看到,调用shutdown函数关闭写之后,仍可以对该套接字进行读操作。
所以上述的问题就是读到EOF时,代表着输入结束,调用shutdown函数通知服务器端,然后进行等待read操作。所以str_cli改为如下代码:
if(FD_ISSET(fileno(fd),&readfs))//注意这里不能else if,因为可能两个描述符同时可读 { if(fgets(sendmsg,MAXLINE,fd)!=NULL) write(sockfd,sendmsg,strlen(sendmsg)); else{ shutdown(sockfd, SHUT_WR);// send FIN FD_CLR(fileno(fd),&allset); continue; } }
五、上述我们已经使用select修改了客户端,下面我们来使用select来修改服务器端,使得服务器由以往的多进程改为单进程。
#include <sys/socket.h> #define MAXLINE 1024 #define PORT 13 #define CONMAX 5 void err_sys(const char* s) { fprintf(stderr, "%s\n",s); exit(1); } int main(int argc, char** argv) { int listenfd, connfd; struct sockaddr_in servaddr; listenfd=socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.port=htona(PORT); servaddr.sin_addr.s_addr=htonl(INADDR_ANY); bind(listenfd,(struct sockaddr*) &servaddr, sizeof(servaddr)); listen(listenfd, CONMAX); pid_t childpid; int maxfd, ready,i,n; int client[CONMAX]={-1};//数组client是用来记录客户端已连接的描述符的集合。 fd_set fdset, allset; FD_ZERO(&allset); FD_SET(listenfd, &allset); maxfd=listenfd;//maxfd是数组client和listenfd的最大值。 while(true) { fdset=allset; ready=select(maxfd+1, &fdset,NULL,NULL,NULL); if(FD_ISSET(listenfd,&fdset)) { if((connfd=accept(listenfd,(struct sockaddr*) NULL,NULL))<0) { if(errno==EINTR) continue; else err_sys("accept error"); } FD_SET(connfd,&allset); for(i=0;i<CONMAX;i++) if(client[i]<0){ client[i]=confd; break; } if(i==CONMAX) err_sys("too manyclients"); if(connfd>maxfd) maxfd=connfd; if(--ready<=0) //如果只是连接,没有接收数据,直接进入下一个循环 continue; } for(i=0;i<CONMAX;i++) { if(client[i]<0) continue; if(FD_ISSET(client[i],&fdset)) { if((n=read(client[i],buff,MAXLINE))==0) { close(client[i]); FD_CLR(client[i],&allset); client[i]=-1; //注意这里并没有更新maxfd的值,因为我们不需要这么做,因为此时maxfd的值可能大于真实的描述符的最大值,但是一定不会小于它,所以只要不小于它,select是没有问题的。而这里如果更新maxfd,也是可以的,但是比较麻烦而已。需要比较此时数组client的最值及listenfd的最大值。 } else write(client[i],buff,n); if(--ready<0) break; } } } } // socket bind listen connect accept涉及socket的这些函数正常情况下都返回0或者具体描述符,在错误情况下都返回-1,错误被记录在errno。 //这里就不做错误检查了。
主体思想就是使用client数组来记录客户端已连接的描述符集合。如果有新的连接,则把它放入client数组中第一个-1的位置,如果关闭连接,则把它的值置为-1.
我们同时select监听listenfd和client数组集合 的描述符。
六、 pselect函数
int pselect(int maxfdp1, fd_set* readset,fd_set* writefd, fd_set* exceptfd, conststruct timespec* timeout, const sigset_t* sigmask);
pselect相比较于select有2处变化:
1. pselect使用timespec结构:
struct timespec{ time_ttv_sec; long tv_nsec; };
第二个成员是纳秒级,比select的timeval的微妙更为精确。
2. pselect函数增加了第六个参数,一个指向信号掩码的指针,用来允许程序先禁止递交某些信号。具体用法以后详解。
七、poll函数
#include <poll.h> int poll(struct pollfd* fdarray, unsigned long nfds, int timeout);
poll函数提供和select类似的功能,不过在处理流设备时,它能提供额外的信息。
1. 第1个参数是结构体的数组。该结构体为:
struct pollfd{ int fd;//描述符 short events;//描述符的检测状态,如可读/可写 short revents;//经过poll函数后,fd变成了什么状态。如无变化,或变为可读/可写 };
这样的好处:无需像select那样传递指针,每次select函数需要修改fd_set结构体,导致每次重新select时,需要重新设置参数。
而这里,events就是需检测状态,而revents就是检测返回的结果。
2. nfds为所检测描述符的个数,即fdarray数组的大小。
3. timeout和select一样,设定超时条件。注意提供一个毫秒数的正值。如果为INFTIM,则永远等待。为0,立即返回。
4.返回值:
如果发生错误,返回-1.
如果超时,没有任何描述符就绪,则返回0,
如果正常,返回就绪描述符的个数,即revents成员值非0的描述符的个数。
如果某个pollfd的fd为负数,则poll忽略,并返回时它的revents设为0.
5. 一般常用的events和revents的值
普通数据可读:POLLRDNORM
普通数据可写:POLLWRNORM
发生错误:POLLERR
6. 我们检查的返回状态时,一般是检查revents的值为可读或为错误、可写或为错误。然后read/write就可以了。因为如果是POLLERR,我们read/write会返回-1,则我们就可以做出相应的处理。
如果我们只检查可读,则发送错误,我们就无法处理了。
7. 我们使用poll来修改上面的服务器端程序
#include <sys/socket.h> #define MAXLINE 1024 #define PORT 13 #define CONMAX 5 void err_sys(const char* s) { fprintf(stderr, "%s\n",s); exit(1); } int main(int argc, char** argv) { int listenfd, connfd; struct sockaddr_in servaddr; listenfd=socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.port=htona(PORT); servaddr.sin_addr.s_addr=htonl(INADDR_ANY); bind(listenfd,(struct sockaddr*) &servaddr, sizeof(servaddr)); listen(listenfd, CONMAX); pid_t childpid; intmaxi, ready,i,n; struct pollfd client[CONMAX+1];//数组client是用来记录客户端已连接的描述符以及listenfd的集合。 client[0].fd=listenfd;//client[0]记录listenfd client[0].events=POLLRDNORM; for(i=1;i<CONMAX;i++) client[i].fd=-1; maxi=0; while(true) { ready=poll(client, maxi+1,INFTIM); if(client[0].revent& POLLRDNORM)//检查可读,一般使用&符号 { if((connfd=accept(listenfd,(struct sockaddr*) NULL,NULL))<0) { if(errno==EINTR) continue; else err_sys("accept error"); } for(i=1;i<CONMAX;i++)//因为client[0]为listenfd,所以从1开始。 if(client[i].fd<0){ client[i].fd=confd; break; } if(i==CONMAX) err_sys("too many clients"); if(i>maxi) maxi=i; client[i].events=POLLRDNORM; if(--ready<=0) continue; } for(i=1;i<=maxi;i++) { if(client[i].fd<0) continue; if(client[i].revents& (POLLRDNORM | POLLERR))//这里检查既要检查是否可读,也要检查是否出错 { if((n=read(client[i],buff,MAXLINE))< 0)//由于上面有出错时的情况,所以这里需要处理。 { if(errno==ECONNERESET)//客户端发送RST分组,即客户端进程崩溃 { close(client[i]); client[i].fd=-1; } else err_sys("read error"); } else if(n==0)//正常关闭连接 { close(client[i]); client[i].fd=-1; } else write(client[i],buff,n); if(--ready<0) break; } } } } // socket bind listen connect accept涉及socket的这些函数正常情况下都返回0或者具体描述符,在错误情况下都返回-1,错误被记录在errno。 //这里就不做错误检查了。
仔细看这段代码,还是很有意思的。
这里的nfds的值,记为maxi+1的值,也就是说,和上面select一样,可以大,因为内核会忽略fd为负的结构i体。
所以这里把nfds的值设为maxi+1.意为数组fdarray中元素的个数。