第6章 I/O多路复用

前一章节客户端同时处理两个输入:标准输入和TCP套接字,然而问题在于客户端阻塞于fgets调用期,服务器进程被杀死后,服务器tcp虽然可以正确发送一个fin,但进程正阻塞于标准输入,它无法看到eof,直到从套接字读为止.这样的进程需要一种预先告诉内核的能力,一旦内核发现进程指定的一个或者多个I/O就绪,它就通知进程,这就叫做I/O复用

select函数

  1. #include <sys/select.h>
  2. #include<sys/time.h>
  3. int select(int nfds, fd_set *readfds, fd_set *writefds,
  4. fd_set *exceptfds, struct timeval *timeout);

timeout参数:告知内核等待所指定的描述符集合的任意一个可以花多少时间

  1. struct timeval{
  2. long tv_sec;
  3. long tv_usec;
  4. };

参数有以下可能:(1) 永远等下去.有一个以上描述符准备好I/O才返回

(2)等待固定一段时间,如果这段时间没I/O准备好.那就直接返回

(3)根本不等待,检查描述符后立刻返回,这称为轮询(polling)

中间三个参数readset,writeset,exceptset指定我们要让内核测试读,写,异常的描述符

异常的一种情况只是某个套接字的带外数据的到达

select使用一个整数数组,其中每个整数的每一位对应一个描述符...其实现隐藏在fd_set跟四个宏

  1. void FD_ZERO(fd_set*fdset);
  2. void FD_SET(int fd,fd_set*fdset);
  3. void FD_CLR(int fd,fd_set *fdset);
  4. int FD_ISSET(int fd,fd_set *fdset);
  1. fd_set rset;
  2. FD_ZERO(&rset);
  3. FD_SET(1,&rset);
  4. FD_SET(4,&rset);
  5. FD_SET(5,&rset);

select中间三个参数.如果对某一个条件不感兴趣那就设置为null.nfds的值等于最大描述符值+1,描述符0一直到nfds-1都被测试,该函数返回时,结果将指示哪些描述符就绪,我们使用d_isset来表示该描述符是否就绪,描述符集内任何与未就绪描述符对应的位返回时均会被置为0,因此,每次调用select函数.我们都要把所关心的描述符集的位设置为1,此函数返回时已就绪总数.如果定时器到时了,那么返回0,出错返回-1(比如被中断打断)

描述符就绪条件:

(1)如果套接字接收缓冲区的数据大于等于套接字接收缓冲区低水位标记的大小(就是最低值,可用SO_REVLOWAT改变),这样对套接字进行读操作不会阻塞并且将会返回一个大于0的值(返回准备好读入的数据)

(2)该连接的读半部关闭(接收FIN的TCP).对这样的套接字的读操作不会阻塞并且返回0(返回EOF)

(3)该套接字是监听套接字并且已完成队列不为空,

(4)一个套接字有错误待处理.对这样的套接字的读操作将不会阻塞返回-1.同时errno将设置为错误条件

套接字可写条件

(1)该套接字发送缓冲区的可用空间字节大于等于套接字发送缓冲区低水位的当前大小

(2)该连接写操作关闭,继续写将产生SIGPIPE信号

(3)使用非阻塞的connect套接字已连接.或者失败

(4)其上有一个套接字错误待处理.对这样的套接字的写操作将不阻塞返回-1.

(3)一个套接字存在带外数据,有异常条件待处理

如果键入eof后直接return那么可能在发送的服务器的数据还在路上或者返回给客户端的数据还在路上,标准eof不意味着完成了套接字的读入

当客户端使用eof关闭读时,将使用shutdown半关闭,接着继续select阻塞.直到服务端也发送fin.客户端套接字触发可读.读到0.表示服务端也开始关闭了.

客户端代码:

  1. #include "unp.h"
  2. void
  3. str_cli(FILE *fp, int sockfd)
  4. {
  5. int maxfdp1, stdineof;
  6. fd_set rset;
  7. char buf[MAXLINE];
  8. int n;
  9. stdineof = 0;
  10. FD_ZERO(&rset);
  11. for ( ; ; ) {
  12. if (stdineof == 0)
  13. FD_SET(fileno(fp), &rset);
  14. FD_SET(sockfd, &rset);
  15. maxfdp1 = max(fileno(fp), sockfd) + 1;
  16. Select(maxfdp1, &rset, NULL, NULL, NULL);
  17. if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
  18. if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
  19. if (stdineof == 1)
  20. return; /* normal termination */
  21. else
  22. err_quit("str_cli: server terminated prematurely");
  23. }
  24. Write(fileno(stdout), buf, n);
  25. }
  26. if (FD_ISSET(fileno(fp), &rset)) {
  27. if ( (n = Read(fileno(fp), buf, MAXLINE)) == 0) {
  28. stdineof = 1;
  29. Shutdown(sockfd, SHUT_WR);
  30. FD_CLR(fileno(fp), &rset);
  31. continue;
  32. }
  33.     Writen(sockfd, buf, n);
  34. }
  35. }
  36. }
  37. int main(int argc,char *argv[])
  38. {
  39. if(argc<3)
  40. err_quit("please input IP Address : Port");
  41. int connfd=Socket(AF_INET,SOCK_STREAM,0);
  42. struct sockaddr_in servaddr;
  43. servaddr.sin_family=AF_INET;
  44. servaddr.sin_port=htons(atoi(argv[2]));
  45. Inet_pton(AF_INET,argv[1],&servaddr.sin_addr);
  46. Connect(connfd,(SA*)&servaddr,sizeof(servaddr));
  47. str_cli(stdin,connfd);
  48. return 0;
  49. }

服务端代码:

  1. #include "unp.h"
  2. static int create_sock(char *ip)
  3. {
  4. int listenfd;
  5. listenfd=Socket(AF_INET,SOCK_STREAM,0);
  6. struct sockaddr_in servaddr;
  7. servaddr.sin_family=AF_INET;
  8. servaddr.sin_port=htons(atoi(ip));
  9. Inet_pton(AF_INET,"0.0.0.0",&servaddr.sin_addr);
  10. Bind(listenfd,(SA*)&servaddr,sizeof(servaddr));
  11. Listen(listenfd,20);
  12. return listenfd;
  13. }
  14. static void deal_cli(int listenfd)
  15. {
  16. int connfd,i;
  17. fd_set allset,rset;//allset保存的整个描述符集。但并不会直接select.每次select都会使得清0.很不方便。于是当要select的时候把它付给rset
  18. FD_ZERO(&allset);
  19. FD_ZERO(&rset);
  20. char buf[1024];//从客户端读取的缓冲区
  21. int ret=0;
  22. int maxfd=listenfd;//select第一个参数
  23. int allfd[MAXLINE];//连接数组
  24. for(i=0;i<MAXLINE;++i){
  25. allfd[i]=-1;
  26. }
  27. int nready; //select I/O已经就绪
  28. int max=0;//表示已完成连接数组中最大的队列
  29. FD_SET(listenfd,&allset);//将监听连接加入allset集合中
  30. for(;;){
  31. rset=allset;//更新
  32. nready=Select(maxfd+1,&rset,NULL,NULL,NULL);
  33. if(FD_ISSET(listenfd,&rset)){
  34. connfd=Accept(listenfd,NULL,NULL); //从已连接套接字拿取一个出来
  35. for(i=0;i<MAXLINE;++i){ //遍历整个已连接数组。找到alifd[i]等于-1(表示该位可用)
  36. if(allfd[i]<0){
  37. allfd[i]=connfd;
  38. break;
  39. }
  40. }
  41. FD_SET(connfd,&allset);
  42. if(maxfd<connfd) //每次都需要比较maxfd跟新来的描述符值大小
  43. maxfd=connfd;
  44. if(max<i)//还要更新最后可连接数组最后的那个已连接索引
  45. max=i;
  46. if(--nready<=0) //如果该连接已经处理完成。那就
  47. continue;
  48. }
  49. for(i=0;i<max+1;++i){ //遍历以此处理所有已完成连接
  50. if(allfd[i]!=-1)//跳过
  51. if(FD_ISSET(allfd[i],&rset)){
  52. ret=read(allfd[i],buf,1024);
  53. if(ret==0){
  54. close(allfd[i]);
  55. FD_CLR(allfd[i],&allset);
  56. allfd[i]=-1;
  57. }else if(ret<0)
  58. {
  59. perror("error");
  60. }else
  61. Writen(allfd[i],buf,ret);
  62. }
  63. if(--nready<=0) //如果处理最后一个连接。继续返回到select中去
  64. continue;
  65. }
  66. }
  67. }
  68. int main(int argc,char *argv[])
  69. {
  70. if(argc<2){
  71. err_quit("please input ip aiddress\n");
  72. }
  73. int listenfd=create_sock(argv[1]);//创建一个套接字
  74. deal_cli(listenfd);//开始处理细节
  75. return 0;
  76. }

poll函数

  1. #include <poll.h>
  2. int poll(struct pollfd *fds, nfds_t nfds, int timeout);

第一个参数是指向一个结构数组第一个元素的指针,每个数组元素都是一个pollfd结构图,用来指定厕所某个给定描述符fd的条件

  1. struct pollfd {
  2. int fd; /* file descriptor */
  3. short events; /* requested events */
  4. short revents; /* returned events */
  5. };

要测试的条件由events成员指定,函数在相应的revents成员返回该描述符状态



 poll识别三类数据:普通,优先级带和高优先级

相对于TCP和UDP套接字而言,以下条件引起poll返回特定的revent

(1)所有正规的TCP数据和所有UDP数据都被认为是普通数据

(2)TCP的带外数据被认为是优先级带数据

(3)当TCP连接的读半部关闭时,也被认为是普通数据,随后的读操作返回0

(4)TCP连接存在错误可以被认为是错误也可以认为是普通数据,无论哪种情况随后的读操作都将返回-1,并且把error设置为合适的值。可用来接收到RST或者发生超时等条件。

(5) 在监听套接字上有新的连接可用,被视为普通数据,也可以认为是优先级数据,大多数被视为普通数据

(6)非阻塞式connect的完成被认为使得套接字可写

结构数组中元素的个数是由nfds参数所指定

timeout参数指定poll函数返回前等待多长时间,它是一个指定应等待毫秒数的正值

INFTIM 永远等待    0 立即返回     >0 等待指定数据的毫秒数

发生错误时,poll的返回值为-1,若定时器到时之前没有任何描述符就绪,则返回0,否则返回就绪描述符个数,也就是revents成员值为非0的个数

如果我们不再关心某个描述符的属性,那就把对应的pollfd的fd设置为一个负数

  1. /* include fig01 */
  2. #include "unp.h"
  3. //#include <limits.h> /* for OPEN_MAX */
  4. int
  5. main(int argc, char **argv)
  6. {
  7. int i, maxi, listenfd, connfd, sockfd;
  8. int nready;
  9. ssize_t n;
  10. char buf[MAXLINE];
  11. socklen_t clilen;
  12. long OPEN_MAX = sysconf(_SC_OPEN_MAX);
  13. struct pollfd client[OPEN_MAX];
  14. struct sockaddr_in cliaddr, servaddr;
  15. listenfd = Socket(AF_INET, SOCK_STREAM, 0);
  16. bzero(&servaddr, sizeof(servaddr));
  17. servaddr.sin_family = AF_INET;
  18. servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  19. servaddr.sin_port = htons(SERV_PORT);
  20. Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
  21. Listen(listenfd, LISTENQ);
  22. client[0].fd = listenfd;
  23. client[0].events = POLLRDNORM;
  24. for (i = 1; i < OPEN_MAX; i++)
  25. client[i].fd = -1; /* -1 indicates available entry */
  26. maxi = 0; /* max index into client[] array */
  27. /* end fig01 */
  28. /* include fig02 */
  29. for ( ; ; ) {
  30. nready = Poll(client, maxi+1, INFTIM);
  31. if (client[0].revents & POLLRDNORM) { /* new client connection */
  32. clilen = sizeof(cliaddr);
  33. connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
  34. #ifdef NOTDEF
  35. printf("new client: %s\n", Sock_ntop((SA *) &cliaddr, clilen));
  36. #endif
  37. for (i = 1; i < OPEN_MAX; i++)
  38. if (client[i].fd < 0) {
  39. client[i].fd = connfd; /* save descriptor */
  40. break;
  41. }
  42. if (i == OPEN_MAX)
  43. err_quit("too many clients");
  44. client[i].events = POLLRDNORM;
  45. if (i > maxi)
  46. maxi = i; /* max index in client[] array */
  47. if (--nready <= 0)
  48. continue; /* no more readable descriptors */
  49. }
  50. for (i = 1; i <= maxi; i++) { /* check all clients for data */
  51. if ( (sockfd = client[i].fd) < 0)
  52. continue;
  53. if (client[i].revents & (POLLRDNORM | POLLERR)) {
  54. if ( (n = read(sockfd, buf, MAXLINE)) < 0) {
  55. if (errno == ECONNRESET) {
  56. /*4connection reset by client */
  57. #ifdef NOTDEF
  58. printf("client[%d] aborted connection\n", i);
  59. #endif
  60. Close(sockfd);
  61. client[i].fd = -1;
  62. } else
  63. err_sys("read error");
  64. } else if (n == 0) {
  65. /*4connection closed by client */
  66. #ifdef NOTDEF
  67. printf("client[%d] closed connection\n", i);
  68. #endif
  69. Close(sockfd);
  70. client[i].fd = -1;
  71. } else
  72. Writen(sockfd, buf, n);
  73. if (--nready <= 0)
  74. break; /* no more readable descriptors */
  75. }
  76. }
  77. }
  78. }
  79. /* end fig02 */

来自为知笔记(Wiz)

时间: 2024-10-27 19:11:45

第6章 I/O多路复用的相关文章

第15章 高并发服务器编程(2)_I/O多路复用

3. I/O多路复用:select函数 3.1 I/O多路复用简介 (1)通信领域的时分多路复用 (2)I/O多路复用(I/O multiplexing) ①同一线程,通过"拨开关"方式,来同时处理多个I/O流,哪个IO准备就绪就把开关拨向它.(I/O多路复用类似于通信领域中的"时分复用") ②通过select/poll函数可以实现IO多路复用,他们采用轮询的方式来监视I/O.而epoll是对select/poll的加强,它是基于事件驱动,epoll_ctl注册事件

Python(七)Socket编程、IO多路复用、SocketServer

本章内容: Socket IO多路复用(select) SocketServer 模块(ThreadingTCPServer源码剖析) Socket socket通常也称作"套接字",用于描述IP地址和端口,是一个通信链的句柄,应用程序通常通过"套接字"向网络发出请求或者应答网络请求. 功能: sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0) 参数一:地址簇 socket.AF_INET IPv4(默认)

[CSAPP笔记][第十二章并发编程]

第十二章 并发编程 如果逻辑控制流在时间上是重叠,那么它们就是并发的(concurrent).这种常见的现象称为并发(concurrency). 硬件异常处理程序,进程和Unix信号处理程序都是大家熟悉的例子. 我们主要将并发看做是一种操作系统内核用来运行多个应用程序的机制. 但是,并发不仅仅局限于内核.它也可以在应用程序中扮演重要的角色. 例如 Unix信号处理程序如何允许应用响应异步事件 例如:用户键入ctrl-c 程序访问虚拟存储器的一个未定义的区域 其他情况 访问慢速I/O设备 当一个应

第十二章 并发编程 学习笔记

第十二章 并发编程 进程是程序级并发,线程是函数级并发. 三种基本的构造并发程序的方法: 进程:每个逻辑控制流是个一个进程,由内核进行调度和维护. I/O多路复用:应用程序在一个进程的上下文中显式地调度他们自己的逻辑流. 线程:运行在单一进程上下文中的逻辑流,由内核进行调度. 12.1 基于进程的并发编程 构造并发程序最简单的方法就是用进程. 使用大家都很熟悉的函数例如: fork exec waitpid 关于在父.子进程间共享状态信息:共享文件表,但不共享用户地址空间. 进程又独立的地址空间

第十二章 并发编程

第十二章 并发编程 三种基本的构造并发程序 进程:每个逻辑控制流是一个进程,由内核进行调度,进程有独立的虚拟地址空间 I/O多路复用:逻辑流被模型化为状态机,所有流共享同一个地址空间 线程:运行在单一进程上下文中的逻辑流,由内核进行调度,共享同一个虚拟地址空间 常用函数: fork exec waitpid 基于I/O多路复用的并发事件驱动服务器 事件驱动程序:将逻辑流模型化为状态机. 状态机: 状态 输入事件 转移 对于状态机的理解,参考EDA课程中学习的状态转换图的画法和状态机. 整体的流程

RabbitMQ学习之:(十一)AMQP.0-10规范,中文翻译1,2,3章 (转载)

From:http://blog.sina.com.cn/s/blog_4aba0c8b0100p6ho.html From: http://blog.sina.com.cn/s/blog_4aba0c8b0100p6hb.html AMQP.0-10中文版——概述 第1章 概述 1.1.  本文档的目标 这份文档定义了高级消息队列协议,这个协议使得遵从该协议的客户端应用和消息中间件服务器之间能够互相通信.为了完全实现互操作性,我们还定义了消息中间件服务的标准行为. 我们面对这个领域有经验的技术

Python----Socket编程、IO多路复用、SocketServer

本章内容: Socket IO多路复用(select) SocketServer 模块(ThreadingTCPServer源码剖析) Socket socket通常也称作"套接字",用于描述IP地址和端口,是一个通信链的句柄,应用程序通常通过"套接字"向网络发出请求或者应答网络请求. 功能: sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0) 参数一:地址簇 socket.AF_INET IPv4(默认)

第8章 云计算

8.1云计算是物联网发展的基石 8.1.1云计算是物联网的基石   2011年1月18日,“打造中国云—云计算促进产业转型升级” 研讨会在南京雨花软件园管委会多功能厅举行.中国工程院院士李德毅,南京市副市长.雨花台区委书记李侃桢,南京邮电大学副校长朱洪波,雨花台区副区长薛国安,以及来自赛迪集团.微软中国.中兴通讯.区科技局.区商务局.雨花软件园等企事业单位的领导.专家六十余人参加了本次会议,就云计算产业发展及促进产 业转型升级展开了研讨. 在研讨会上中国工程院院士李德毅的演讲将大家带入了云计算的

[深入理解Android卷一全文-第四章]深入理解zygote

由于<深入理解Android 卷一>和<深入理解Android卷二>不再出版,而知识的传播不应该由于纸质媒介的问题而中断,所以我将在CSDN博客中全文转发这两本书的所有内容. 第4章  深入理解Zygote 本章主要内容 ·  深入分析zygote,并介绍system_server进程的初始化工作. 本章涉及的源代码文件名称及位置 以下是我们本章分析的源代码文件名称及其位置. ·  App_main.cpp framework/base/cmds/app_process/App_m