接收端程序: 1024个socket建立连接后,创建8个线程收数据,每个线程处理128个socket, 每个线程先select这128个socket,然后用FD_ISSET对这128个socket进行检查和读出。
用脚本bw.sh 统计得到的接收端的总带宽为 0.114 Gb/s, 用 iftop 查看得到的接收端带宽在前几秒为3Gb/s, 然后下降到 112 Mb/s。
接收队列的长度为3.7MB。由此可见接收端很慢。
这种一次select多个socket的工作方式效率很低, 不应该采用。
发送端程序:server1bak.c 布置4个发送端
1 #include <stdio.h> 2 #include <stdlib.h> 3 4 #include<pthread.h> 5 #include <unistd.h> 6 7 #include <sys/socket.h> 8 #include <arpa/inet.h> 9 #include <netinet/in.h> 10 #include <netinet/tcp.h> 11 #include<sys/time.h> 12 #include<errno.h> 13 #include<string.h> 14 15 #define IP "192.168.250.147" 16 #define PORT 33333 17 #define SOCKNUM 256 18 #define PACKETSIZE 2048 19 20 typedef struct { 21 int sock; 22 }ARG; 23 24 pthread_t tid[SOCKNUM]; 25 26 27 int SetSocketOptions(int fd) 28 { 29 int sockopt = 0; 30 int SOCKET_ERROR = -1; 31 static const int c_so_rcvbuf = 256*1024; 32 static const int c_so_sndbuf = 256*1024; 33 34 sockopt = 1; 35 if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 36 { 37 perror("set reuseaddr error"); 38 return -1; 39 } 40 41 sockopt = c_so_sndbuf; 42 if ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 43 { 44 perror("set so_sndbuf error"); 45 return -1; 46 } 47 48 sockopt = c_so_rcvbuf; 49 if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 50 { 51 perror("set so_rcvbuf error"); 52 return -1; 53 } 54 } 55 56 57 58 59 int senddata(int sock) 60 { 61 int ret; 62 char temp[PACKETSIZE+1] = "I want to know!"; 63 int size_left=2048; 64 while( size_left > 0) 65 { 66 ret = send(sock, temp, size_left, 0); 67 if (ret < 0) 68 { 69 perror("send fail"); 70 exit(1); 71 } 72 size_left -= ret; 73 } 74 75 return size_left; 76 } 77 78 79 int read_cwnd(int tcp_work_socket) 80 { 81 struct tcp_info info; 82 socklen_t tcp_info_length = sizeof(tcp_info); 83 if ( getsockopt(tcp_work_socket, SOL_TCP, TCP_INFO, (void *)&info, &tcp_info_length) == 0 ) { 84 printf(" cwnd:%u, snd_ssthresh:%u, rtt:%u, rtt_d:%u\n", 85 info.tcpi_snd_cwnd, 86 info.tcpi_snd_ssthresh, 87 info.tcpi_rtt, 88 info.tcpi_rttvar 89 ); 90 } 91 return 0; 92 } 93 94 95 void *sendData(void *arg) 96 { 97 #if 1 98 ARG *a = (ARG *)arg; 99 int accept_sock = a->sock; 100 101 long count = 0; 102 struct timeval start; 103 struct timeval end; 104 unsigned long timer=0; 105 gettimeofday(&start,NULL); 106 107 108 while(1){ 109 senddata(accept_sock); 110 usleep(900); 111 if(pthread_self()== tid[0]) 112 { 113 // FILE* fpointer = fopen("result.out", "a+"); 114 count++; 115 gettimeofday(&end,NULL); 116 timer = 1000000 * (end.tv_sec-start.tv_sec)+ end.tv_usec-start.tv_usec; 117 if(timer%2000== 0) 118 { 119 printf("count: %ld, socket: %ld, %lf s, %lf Gb/s, ", count, accept_sock, timer/1000000.0, count*2048.0/timer/1024*8); 120 read_cwnd(accept_sock); 121 } 122 123 #if 1 124 if(timer > 3600000000) 125 { 126 printf("before close: timer: %ld\n", timer); 127 close(accept_sock); 128 // fclose(fpointer); 129 break; 130 } 131 #endif 132 } 133 } 134 return 0; 135 #endif 136 137 } 138 139 140 141 int main() 142 { 143 int accept_sock[SOCKNUM]; 144 struct sockaddr_in addr_ser; 145 146 147 int sock = socket(AF_INET, SOCK_STREAM, 0); 148 if (sock < 0) 149 { 150 perror("create sock fail"); 151 exit(1); 152 } 153 154 addr_ser.sin_family = AF_INET; 155 addr_ser.sin_port = htons(PORT); 156 // addr_ser.sin_addr.s_addr = inet_addr(IP); 157 addr_ser.sin_addr.s_addr = htonl(INADDR_ANY); 158 159 160 int sockopt = 1; 161 if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, (char *)&sockopt, sizeof(int)) < 0) 162 { 163 perror("setsockopt fail"); 164 exit(1); 165 } 166 167 if (bind(sock, (struct sockaddr*)&addr_ser, sizeof(struct sockaddr)) < 0) 168 { 169 perror("bind fail"); 170 exit(1); 171 } 172 173 174 #if 0 175 if ( SetSocketOptions(sock) == -1) 176 { 177 perror("set socket options error"); 178 exit(1); 179 } 180 #endif 181 182 if (listen(sock, 2000) < 0) 183 { 184 perror("listen fail"); 185 exit(1); 186 } 187 188 189 for(int i=0; i<SOCKNUM; i++) 190 { 191 192 accept_sock[i] = accept(sock, 0, 0); 193 if (accept_sock[i] < 0) 194 { 195 perror("accept fail"); 196 exit(1); 197 } 198 printf("accept ok!\n"); 199 } 200 201 202 #if 1 203 204 //static extern pthread_t tid[SOCKNUM]; 205 ARG a[SOCKNUM]; 206 for(int i=0; i<SOCKNUM; i++){ 207 a[i].sock = accept_sock[i]; 208 pthread_create(&tid[i], 0, sendData, (void *)&a[i]); 209 } 210 #endif 211 212 #if 1 213 for(int i=0; i<SOCKNUM; i++) 214 { 215 pthread_join(tid[i], 0); 216 } 217 #endif 218 219 return 0; 220 }
接收端代码:client1_multi_select2.c
1 #include <stdio.h> 2 #include <stdlib.h> 3 4 #include<pthread.h> 5 #include <unistd.h> 6 #include <time.h> 7 8 #include<sys/ioctl.h> 9 #include <sys/socket.h> 10 #include <arpa/inet.h> 11 #include <netinet/in.h> 12 #include<sys/time.h> 13 #include<string> 14 15 #define PORT 33333 16 17 #define SOCKNUM 1024 18 #define THREAD_NUM 8 19 #define SOCKET_PER_THREAD 128 20 #define SERVER_NUM 4 21 #define MSGSIZE 2048 22 23 typedef struct{ 24 int sock[SOCKET_PER_THREAD]; 25 }ARG; 26 27 int SetSocketOptions(int fd) 28 { 29 int sockopt = 0; 30 int SOCKET_ERROR = -1; 31 static const int c_so_rcvbuf = 256*1024; 32 static const int c_so_sndbuf = 256*1024; 33 34 sockopt = 1; 35 if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 36 { 37 perror("set reuseaddr error"); 38 return -1; 39 } 40 41 sockopt = c_so_sndbuf; 42 if ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 43 { 44 perror("set so_sndbuf error"); 45 return -1; 46 } 47 48 sockopt = c_so_rcvbuf; 49 if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 50 { 51 perror("set so_rcvbuf error"); 52 return -1; 53 } 54 } 55 56 int recvdata(int sock, char *buffer) 57 { 58 int msgsize = MSGSIZE; 59 int ret; 60 int nrecv=0; 61 while (nrecv < msgsize) 62 { 63 ret = recv(sock, buffer, msgsize-nrecv, 0); 64 if (ret < 0) 65 { 66 perror("recv fail"); 67 exit(1); 68 } 69 else 70 { 71 nrecv += ret; 72 } 73 } 74 return nrecv; 75 } 76 77 int max(int a, int b) 78 { 79 if(a >= b) 80 return a; 81 else 82 return b; 83 } 84 85 86 87 88 int poll_and_recv(fd_set recv_fds, int maxfd, int* socket) 89 { 90 #if 1 91 char buffer[MSGSIZE] = "0"; 92 struct timeval c_select_timeout = {0, 0}; 93 int sel = select(maxfd+1, &recv_fds, NULL, NULL, &c_select_timeout); 94 if(sel == 0) 95 { 96 return 0; 97 } 98 else if(sel > 0) 99 { 100 for(int i=0; i<SOCKET_PER_THREAD; i++) 101 { 102 if( FD_ISSET(socket[i], &recv_fds) ) // socket is readable 103 { 104 int length; 105 int status = ioctl(socket[i], FIONREAD, &length); 106 if(status == -1) 107 { 108 printf("Error reading input size\n"); 109 } 110 if(length) 111 { 112 //printf("data in socket %d : %d\n", socket[i], length); 113 // for(int num=0; num<300; num++) 114 // { 115 recvdata(socket[i], buffer); 116 // } 117 } 118 else // length==0 119 { 120 printf("Nothing to read, eof??\n"); 121 if(socket[i] != -1) 122 { 123 close(socket[i]); 124 socket[i] = -1; 125 } 126 perror("socket flagged but no data available probable EOF"); 127 } 128 129 } // FD_ISSET != 0 130 } // i < SOCKET_PER_THREAD 131 } // sel > 0 132 else // sel < 0 133 { 134 for (int i=0; i<SOCKET_PER_THREAD; i++) 135 { 136 if(socket[i] != -1) 137 { 138 close(socket[i]); 139 socket[i] = -1; 140 } 141 } 142 perror("select<0 error"); 143 return 0; 144 } 145 146 #endif 147 return 1; 148 149 } 150 151 152 153 void *recvData(void *arg) 154 { 155 ARG* a = (ARG*)arg; 156 int *socket = a->sock; 157 char buffer[MSGSIZE] = "0"; 158 int count = 0; 159 160 int maxfd = 0; 161 for(int i=0; i<SOCKET_PER_THREAD; i++) 162 { 163 maxfd = max(socket[i], maxfd); 164 } 165 166 while(1) 167 { 168 fd_set recv_fds; 169 for(int i=0; i<SOCKET_PER_THREAD; i++) 170 { 171 FD_ZERO(&recv_fds); 172 FD_SET(socket[i], &recv_fds); 173 } 174 poll_and_recv(recv_fds, maxfd, socket); 175 } 176 return 0; 177 } 178 179 180 int main() 181 { 182 int sock[SERVER_NUM][SOCKNUM/SERVER_NUM]; 183 struct sockaddr_in addr_ser[SERVER_NUM]; 184 struct sockaddr_in addr_cli[SERVER_NUM][SOCKNUM/SERVER_NUM]; 185 186 std::string local_ip("192.168.250.141"); 187 188 std::string server_ip[SERVER_NUM] = {"192.168.250.146", "192.168.250.147", "192.168.250.142", "192.168.250.143"}; 189 //std::string server_ip[SERVER_NUM] = {"192.168.251.166", "192.168.251.167", "192.168.251.162", "192.168.251.163"}; 190 // std::string server_ip[SERVER_NUM] = {"192.168.251.163"}; 191 for(int ser=0; ser < SERVER_NUM; ser++) 192 { 193 for(int i=0; i<SOCKNUM/SERVER_NUM; i++) 194 { 195 sock[ser][i] = socket(AF_INET, SOCK_STREAM, 0); 196 if(sock[ser][i] < 0) 197 { 198 printf("%d ", i); 199 perror("create socket fail"); 200 } 201 202 addr_ser[ser].sin_family = AF_INET; 203 addr_ser[ser].sin_port = htons(PORT); 204 addr_ser[ser].sin_addr.s_addr = inet_addr(server_ip[ser].c_str()); 205 206 addr_cli[ser][i].sin_family = AF_INET; 207 addr_cli[ser][i].sin_port = 0; 208 addr_cli[ser][i].sin_addr.s_addr = inet_addr(local_ip.c_str()); 209 210 211 int sockopt = 1; 212 if ( setsockopt(sock[ser][i], SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == -1 ) 213 { 214 perror("set reuseaddr error"); 215 exit(1); 216 } 217 218 219 #if 0 220 221 if ( SetSocketOptions(sock[ser][i]) == -1) 222 { 223 perror("set socket options error"); 224 exit(1); 225 } 226 #endif 227 228 if( bind(sock[ser][i], (struct sockaddr*)&addr_cli[ser][i], sizeof(addr_cli[ser][i]) ) < 0 ) 229 { 230 perror("TCP bind: "); 231 exit(1); 232 } 233 printf("bind ok!\n"); 234 235 if(connect(sock[ser][i], (struct sockaddr*)&addr_ser[ser], sizeof(struct sockaddr)) < 0) 236 { 237 perror("connect fail:"); 238 exit(1); 239 } 240 printf("connect ok!\n"); 241 242 } 243 244 } 245 246 247 int socket[SOCKNUM] ; 248 int count=0; 249 for(int i=0; i< SERVER_NUM; i++) 250 { 251 for(int j=0; j<SOCKNUM/SERVER_NUM; j++) 252 { 253 socket[count++] = sock[i][j]; 254 } 255 } 256 257 pthread_t tid[THREAD_NUM]; 258 ARG a[THREAD_NUM]; 259 for(int i=0; i<THREAD_NUM; i++) 260 { 261 for(int j=0; j<SOCKET_PER_THREAD; j++) 262 { 263 a[i].sock[j] = socket[i*SOCKET_PER_THREAD+j]; 264 } 265 pthread_create(&tid[i], 0, recvData, (void *)&a[i]); 266 } 267 268 for(int i=0; i<SOCKNUM; i++) 269 { 270 pthread_join(tid[i], 0); 271 } 272 273 return 0; 274 }
计算带宽代码:bw.sh
1 #!/bin/bash 2 3 4 bw1=`ifconfig eth4 |grep "RX bytes"|awk ‘{print $2}‘|awk -F : ‘{print $2}‘` 5 sleep 60 6 bw2=`ifconfig eth4 |grep "RX bytes"|awk ‘{print $2}‘|awk -F : ‘{print $2}‘` 7 8 echo "($bw2-$bw1)/1024.0/1024.0/1024.0*8/60.0"|bc -l
时间: 2024-10-19 01:59:27