poll案例

  1 #include <iostream>
  2 #include <sstream>
  3 #include <map>
  4
  5 #include <stdio.h>
  6 #include <errno.h>
  7 #include <string.h>
  8
  9 #include <sys/socket.h>
 10 #include <netinet/in.h>
 11 #include <arpa/inet.h>
 12
 13 #include <unistd.h>
 14 #include <fcntl.h>
 15 #include <poll.h>
 16
 17 #define LOCAL_AF            AF_INET             // 本地地址类型
 18 #define LOCAL_IP_ADDR       "172.20.0.115"
 19 #define LOCAL_PORT          8090
 20
 21 #define BACKLOG             10                  // 监听等待队列个数
 22
 23 #define MAX_FD              1024                // 最大打开文件描述符数目
 24 #define MAX_USER            3                   // 最大用户数
 25
 26 #define FAILED_TEST_CNT     10                  // socket调用失败, 连续测试次数
 27
 28 // 统计连接socket和对应的socket连续调用失败次数, 失败 FAILED_TEST_CNT 次,关闭移除socket
 29 typedef std::map<int, int> Clients;
 30
 31 using namespace std;
 32
 33 static void showErr()
 34 {
 35     printf("errno: %d, err_msg: %s\n", errno, strerror(errno));
 36 }
 37
 38 // 设置socket为非阻塞
 39 static bool setSocketNonBlock(int socketFd)
 40 {
 41     int flags = fcntl(socketFd, F_GETFL);
 42     if (-1 == flags)
 43     {
 44         cout << "fcntl F_GETFL error" << endl;
 45         return false;
 46     }
 47
 48     int ret = fcntl(socketFd, F_SETFL, O_NONBLOCK | flags);
 49     if (-1 == ret)
 50     {
 51         cout << "fcntl F_SETFL error" << endl;
 52         return false;
 53     }
 54     return true;
 55 }
 56
 57 // 设置socket关闭时不等待缓存区数据发送完毕(等待可能导致缓冲区数据无法发送导致一直占用端口: 连接一直处于FIN_WAIT1状态)
 58 static int disableLinger(int socketFd)
 59 {
 60     struct linger lingerVal;
 61     lingerVal.l_onoff = 1;
 62     lingerVal.l_linger = 0;
 63     int ret = setsockopt(socketFd, SOL_SOCKET, SO_LINGER, &lingerVal, sizeof(lingerVal));
 64     if (0 != ret)
 65     {
 66         cout << "setsockopt linger error" << endl;
 67         return false;
 68     }
 69     return true;
 70 }
 71
 72 int main(int argc, char **argv)
 73 {
 74     int ret(0);
 75
 76     // 生成监听socket
 77     int listenSocket(-1);
 78     listenSocket = socket(LOCAL_AF, SOCK_STREAM, IPPROTO_TCP);
 79     if (-1 == listenSocket)
 80     {
 81         cout << "socket error" << endl;
 82         return 1;
 83     }
 84
 85     if (!setSocketNonBlock(listenSocket))
 86     {
 87         return 1;
 88     }
 89
 90     if (!disableLinger(listenSocket))
 91     {
 92         return 1;
 93     }
 94
 95     // 绑定监听socket到指定网卡和端口
 96     in_addr localAddr;
 97     inet_pton(LOCAL_AF, LOCAL_IP_ADDR, &localAddr);
 98     sockaddr_in localSockAddr;
 99     localSockAddr.sin_family = LOCAL_AF;
100     localSockAddr.sin_addr = localAddr;
101  // localSockAddr.sin_addr.s_addr = htonl(INADDR_ANY);      // 绑定到所有网卡的指定端口(多网卡自适应)
102     localSockAddr.sin_port = htons(LOCAL_PORT);
103     ret = bind(listenSocket, (sockaddr *)&localSockAddr, sizeof(localSockAddr));
104     if (0 != ret)
105     {
106         cout << "bind error" << endl;
107         return 1;
108     }
109
110     ret = listen(listenSocket, BACKLOG);
111     if (0 != ret)
112     {
113         cout << "listen error" << endl;
114         return 1;
115     }
116
117     // 创建pollfd结构体数组, 使用poll函数管理监听socket和所有的连接socket
118     pollfd pollFds[MAX_FD];
119     memset(pollFds, 0, sizeof(pollFds));    // memset初始化内存只适用于POD类型
120     int usedFdIdx = -1;     // 已用到的pollfd数组索引
121
122     // 将监听socket纳入pollfd数组第一个元素
123     ++usedFdIdx;
124     pollFds[usedFdIdx].fd = listenSocket;
125     pollFds[usedFdIdx].events = POLLIN;     // 客户端请求到达事件(connect等)
126
127     Clients clients;
128
129     bool done(false);
130     while (!done)
131     {
132         ret = poll(pollFds, usedFdIdx + 1, 1000);   // 等待1秒
133         switch(ret)
134         {
135         case -1:
136             showErr();
137             return 1;
138         case 0:
139             cout << "timeout" << endl;
140             break;
141         default:
142             // 遍历客户端连接socket
143             for (int idx = 1; idx <= usedFdIdx; ++idx)
144             {
145                 Clients::iterator iter = clients.find(pollFds[idx].fd);
146
147                 // 客户端可写
148                 if (pollFds[idx].revents & POLLOUT)
149                 {
150                     string msg = "i‘m client\r\n";
151                     ret = send(pollFds[idx].fd, msg.c_str(), msg.length(), 0);
152                     // send failed
153                     if (-1 == ret && EAGAIN != errno)    // EAGAIN代表缓冲区满了, 不是错误需要排除
154                     {
155                         iter->second++;
156
157                         // 连续
158                         if (iter->second >= 10)
159                         {
160                             ret = close(pollFds[idx].fd);
161                             if (0 == ret)
162                             {
163                                 cout << "close socket :" << pollFds[idx].fd << " success" <<endl;
164                             }
165                             else
166                             {
167                                 cout << "close socket :" << pollFds[idx].fd << " failed" <<endl;
168                             }
169
170                             clients.erase(iter);
171                             for (int i = idx; i < usedFdIdx; ++i)
172                             {
173                                 pollFds[i] = pollFds[i + 1];
174                             }
175                             --usedFdIdx;
176                         }
177                     }
178                     else
179                     {
180                         iter->second = 0;
181                     }
182
183                     cout << "send ret: " << ret << endl;
184                 }
185                 else if (pollFds[idx].revents & POLLERR)
186                 {
187                     cout << pollFds[idx].fd << " error" << endl;
188
189                     iter->second++;
190
191                     // remove socket
192                     if (iter->second >= 10)
193                     {
194                         ret = close(pollFds[idx].fd);
195                         if (0 == ret)
196                         {
197                             cout << "close socket :" << pollFds[idx].fd << " success" <<endl;
198                         }
199                         else
200                         {
201                             cout << "close socket :" << pollFds[idx].fd << " failed" <<endl;
202                         }
203
204                         clients.erase(iter);
205                         for (int i = idx; i < usedFdIdx; ++i)
206                         {
207                             pollFds[i] = pollFds[i + 1];
208                         }
209                         --usedFdIdx;
210                     }
211                 }
212
213                 if (idx == usedFdIdx)
214                 {
215                     sleep(1);
216                 }
217             }
218
219             // 客户端connect请求到达
220             if (pollFds[0].revents & POLLIN)
221             {
222                 if (usedFdIdx < MAX_USER)
223                 {
224                     int connectSocket(-1);
225                     connectSocket = accept(listenSocket, NULL, NULL);
226                     if (-1 == connectSocket)
227                     {
228                         cout << "accept error" << endl;
229                         break;
230                     }
231                     cout << "accept success" << endl;
232
233                     if (!setSocketNonBlock(connectSocket))
234                     {
235                         break;
236                     }
237
238                     if (!disableLinger(connectSocket))
239                     {
240                         break;
241                     }
242
243                     // 连接socket使用poll管理
244                     ++usedFdIdx;
245                     pollFds[usedFdIdx].fd = connectSocket;
246                     pollFds[usedFdIdx].events = POLLOUT;
247
248                     //
249                     clients.insert(make_pair<int, int>(connectSocket, 0));
250                 }
251                 else    // fd too many
252                 {
253
254                 }
255             }
256         }
257     }
258
259
260     return 0;
261 }
时间: 2024-10-16 22:09:38

poll案例的相关文章

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

2高并发服务器:多路IO之poll

 1 poll A 依赖的头文件 #include <poll.h> B 函数声明 int poll(struct pollfd *fds, nfds_t nfds,int timeout); struct pollfd { int fd; /* 文件描述符*/ short events; /* 监控的事件*/ short revents; /* 监控事件中满足条件返回的事件*/ }; POLLIN普通或带外优先数据可读,即POLLRDNORM |POLLRDBAND POLLRDNORM

OpenStack pike版 基本环境部署(2) 续案例架构(1)

续案例架构(1) 环境准备工作: 按照以下规划配置各主机IP地址及主机名称 # controller 10.0.0.11       controller # compute1 10.0.0.31       compute1 # block1 10.0.0.41       block1 # object1 10.0.0.51       object1 # object2 10.0.0.52       object2 做名称解析: 编辑/etc/hosts文件,将以上内容写入即可. 关闭所

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

编写案例分别使用多进程、多路复用(select、epoll)实现tcp服务

-------------------------------多进程的tcp服务器-------------------------------通过为每个客户创建一个进程的方式,能够同时为多个客户进行服务器当客户不是特别多的时候,这种方式还行,如果有几百上千个,就不可取了,因为每次创建进程等过程需要好较大的资源 python代码案例: 1 #coding=utf-8 2 3 #引用对应的包 4 from socket import * 5 6 from multiprocessing impor

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

多线程的对比与案例(计算目录下文件的大小)

本人使用的是mac 所以有usr目录.把以下的几种情况分别贴出来给大家分析下各自有什么优缺点! 1.顺序计算目录大小code: package jvm; import java.io.File; /** * 第一版 * 顺序计算目录大小 * @author zeuskingzb * */ public class TotalFileSizeSequential { private long getTotalSizeOfFilesInDir(File file){ if (file.isFile(