题目:编写一个同步服务器模型 要求: 1)客户端A主机给服务器B主机发送报文, 2)B服务器主机收到报文以后同时分发给C1主机、C2主机; 3)C1主机和C2主机打印出客户端A的报文
bug总结:本来这道题目并不困难,就是向客户端连接池中的其他客户端发送数据,但是我这里出现了一个失误,我把接收到的数据直接发送了。 第一步:recv_packet(fd, &pack, &buflen, 0);第二步:send_packet(cltpool[i], &pack, buflen, 0);但是第二步中buflen的实际长度应该比buflen+4,导致发送的结构体不完整,接收失败,效果类似于没有接收到消息
核心代码
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/select.h> #include <fcntl.h> #include <termios.h> #include <signal.h> #include <pthread.h> #include "commsock.h" #define MAXBUFSIZE 1020 //报文结构 typedef struct _packet { int len; char buf[MAXBUFSIZE]; } Packet; /** * readn - 读取固定大小的字节 * @fd:文件描述符 * @buf:接收缓冲区 * @count:指定读取字节数 * 成功返回count,失败返回-1,对等方连接关闭返回<count * */ int readn(int fd, void *buf, int count) { int nread = 0; int lread = count; char *pbuf = (char *) buf; while (lread > 0) { do { nread = read(fd, pbuf, lread); } while (nread == -1 && errno == EINTR); if (nread == -1) return -1; else if (nread == 0) return count - lread; lread -= nread; pbuf += nread; } return count; } /** * writen - 写固定大小字节数 * @fd:文件描述符 * @buf:写入缓冲区 * @count:指定写入字节数 * 成功返回count,失败返回-1 * */ int writen(int fd, void *buf, int count) { int lwrite = count; int nwrite = 0; char *pbuf = (char *) buf; while (lwrite > 0) { do { nwrite = write(fd, pbuf, lwrite); } while (nwrite == -1 && errno == EINTR); if (nwrite == -1) return -1; lwrite -= nwrite; pbuf += nwrite; } return count; } /** * read_timeout - 读超时检测函数,不含读操作 * @fd:文件描述符 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 成功返回0,失败返回-1,超时返回-1并且errno=ETIMEDOUT * */ int read_timeout(int fd, unsigned int wait_seconds) { int ret = 0; if (wait_seconds > 0) { fd_set readfds; FD_ZERO(&readfds); FD_SET(fd, &readfds); struct timeval timeout; timeout.tv_sec = wait_seconds; timeout.tv_usec = 0; do { ret = select(fd + 1, &readfds, NULL, NULL, &timeout); } while (ret == -1 && errno == EINTR); //ret==-1 if (ret == 0) { errno = ETIMEDOUT; ret = -1; } else if (ret == 1) { ret = 0; } } return ret; } /** * write_timeout - 写超时检测函数,不含写操作 * @fd:文件描述符 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 成功返回0,失败返回-1,超时返回-1并且errno=ETIMEDOUT * */ int write_timeout(int fd, unsigned int wait_seconds) { int ret = 0; if (wait_seconds > 0) { fd_set writefds; FD_ZERO(&writefds); FD_SET(fd, &writefds); struct timeval timeout; timeout.tv_sec = wait_seconds; timeout.tv_usec = 0; do { ret = select(fd + 1, NULL, &writefds, NULL, &timeout); } while (ret == -1 && errno == EINTR); //ret==-1 if (ret == 0) { errno = ETIMEDOUT; ret = -1; } else if (ret == 1) { ret = 0; } } return ret; } /** * activate_nonblock - 设置套接字非阻塞 * @fd:文件描述符 * 成功返回0,失败返回-1 * */ int activate_nonblock(int fd) { int ret = 0; int flags = fcntl(fd, F_GETFL); if (flags == -1) return -1; flags = flags | O_NONBLOCK; ret = fcntl(fd, F_SETFL, flags); //ret==-1 return ret; } /** * deactivate_nonblock - 设置套接字阻塞 * @fd:文件描述符 * 成功返回0,失败返回-1 * */ int deactivate_nonblock(int fd) { int ret = 0; int flags = fcntl(fd, F_GETFL); if (flags == -1) return -1; flags = flags & (~O_NONBLOCK); ret = fcntl(fd, F_SETFL, flags); return ret; } /** * connect_timeout - 带超时的connect(函数内已执行connect) * @fd:文件描述符 * @addr:服务器网络地址结构 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 成功返回0,失败返回-1,超时返回-1并且errno=ETIMEDOUT * */ int connect_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds) { int ret = 0; if (wait_seconds > 0) { if (activate_nonblock(fd) == -1) return -1; } ret = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr_in)); if (ret == -1 && errno == EINPROGRESS) { fd_set writefds; FD_ZERO(&writefds); FD_SET(fd, &writefds); struct timeval timeout; timeout.tv_sec = wait_seconds; timeout.tv_usec = 0; int nwrite = select(fd + 1, NULL, &writefds, NULL, &timeout); //nwrite==-1 此时ret==-1 if (nwrite == 0) errno = ETIMEDOUT; else if (nwrite == 1) { int err = 0; socklen_t len = sizeof(err); ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); if (ret == 0) { if (err != 0) { errno = err; ret = -1; } } } } if (wait_seconds > 0) { if (deactivate_nonblock(fd) == -1) return -1; } return ret; } /** * sock_init - 初始化SOCKET环境 * @connid:连接套接字 * 成功返回0,失败返回错误码 * */ int sock_init(int *connid) { int ret = 0; if (connid == NULL) { ret = SckParamErr; printf("cltsock_init() params not correct !\n"); return ret; } //init ret = socket(AF_INET, SOCK_STREAM, 0); if (ret == -1) { ret = SckBaseErr; perror("socket() err"); return ret; } else { *connid = ret; ret = 0; } return ret; } /** * connect_server - 连接服务器 * @connid:连接套接字 * @port:端口号 * @ipaddr:IP地址 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 成功返回0,失败返回错误码 * */ int connect_server(int connid, int port, char *ipaddr, unsigned int wait_seconds) { int ret = 0; if (connid < 0 || port < 0 || port > 65535 || ipaddr == NULL || wait_seconds < 0) { ret = SckParamErr; printf("cltsock_init() params not correct !\n"); return ret; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(8080); addr.sin_addr.s_addr = inet_addr("127.0.0.1"); ret = connect_timeout(connid, &addr, wait_seconds); if (ret == -1) { if (errno == ETIMEDOUT) { ret = SckTimeOut; printf("connect_timeout() time out !\n"); return ret; } ret = SckBaseErr; perror("connect_timeout() err"); return ret; } return ret; } /** * send_packet - 发送数据包 * @fd:文件描述符 * @pack:数据包 * @buflen:数据包大小 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 成功返回0,失败返回错误码 * */ int send_packet(int fd, Packet *pack, int buflen, unsigned int wait_seconds) { int ret = 0; //可写检测 ret = write_timeout(fd, wait_seconds); if (ret == -1) { if (errno == ETIMEDOUT) { ret = SckTimeOut; printf("write_timeout() time out !\n"); return ret; } ret = SckBaseErr; perror("write_timeout() err"); return ret; } //发送数据 ret = writen(fd, pack, buflen); if (ret != buflen) { ret = SckBaseErr; perror("writen() err"); return ret; } else { ret = 0; } return ret; } /** * send_packet - 接收数据包 * @fd:文件描述符 * @pack:数据包 * @buflen:数据包大小 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 成功返回0,失败返回错误码 * */ int recv_packet(int fd, Packet *pack, int *buflen, unsigned int wait_seconds) { int ret = 0; //读超时检测 ret = read_timeout(fd, wait_seconds); if (ret == -1) { if (errno == ETIMEDOUT) { ret = SckTimeOut; printf("read_timeout() time out !\n"); return ret; } ret = SckBaseErr; perror("read_timeout() err"); return ret; } //获取数据长度 int len = 0; ret = readn(fd, &pack->len, 4); if (ret == -1) { ret = SckBaseErr; perror("readn() err"); return ret; } else if (ret < 4) { ret = SckPeerClosed; printf("peer is closed !\n"); return ret; } //网络字节序转化成本地字节序 len = ntohl(pack->len); //获取包体 ret = readn(fd, pack->buf, len); if (ret == -1) { ret = SckBaseErr; perror("readn() err"); return ret; } else if (ret < len) { ret = SckPeerClosed; printf("peer is closed !\n"); return ret; } else if (ret == len) { ret = 0; } *buflen = len; return ret; } /** * start_thread - 客户端线程回调函数 * @arg:参数 * */ void *start_thread(void *arg) { if (arg == NULL) { printf("start_thread() params not correct !\n"); return NULL; } int fd = (int) arg; int ret = 0; //接收信息并且打印 Packet pack; int buflen = MAXBUFSIZE; while (1) { memset(&pack, 0, sizeof(pack)); ret = recv_packet(fd, &pack, &buflen, 500); if (ret != 0) { printf("客户端线程退出了!\n"); //退出当前进程 pthread_exit(NULL); } //打印数据 fputs(pack.buf, stdout); //fflush(stdout); } return NULL; } /** * run_clt - 运行客户端 * @connid:连接套接字 * @wait_seconds:等待超时秒数,如果为0表示不检测超时 * 失败返回错误码 * */ int run_clt(int connid, unsigned int wait_seconds) { int ret = 0; //安装信号 if (signal(SIGPIPE, handler) == SIG_ERR) { ret = SckBaseErr; printf("signal() failed !\n"); return ret; } //开始多线程 pthread_t thr1; pthread_attr_t attr; pthread_attr_init(&attr); //设置进程为可分离状态 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (pthread_create(&thr1, &attr, start_thread, connid) != 0) { ret = SckBaseErr; printf("pthread_create() failed !\n"); return ret; } Packet pack; memset(&pack, 0, sizeof(pack)); int buflen = 0; while (fgets(pack.buf, MAXBUFSIZE, stdin) != NULL) { //去除\n buflen = strlen(pack.buf); pack.len = htonl(buflen); //发送数据 ret = send_packet(connid, &pack, buflen + 4, wait_seconds); if (ret != 0) { return ret; } } return ret; } /** * close_socket - 关闭连接 * @fd:文件描述符 * 成功返回0 * */ int close_socket(int fd) { int ret = 0; close(fd); return ret; } /* * clear_back - 退格键不回显 * 成功返回0,失败返回错误码 * */ int clear_back() { int ret = 0; struct termios term; memset(&term, 0, sizeof(term)); //获取当前系统设置 if (tcgetattr(STDIN_FILENO, &term) == -1) { ret = SckBaseErr; perror("tcgetattr() err"); return ret; } //修改系统设置 term.c_cc[VERASE] = ‘\b‘; //立即生效 if (tcsetattr(STDIN_FILENO, TCSANOW, &term) == -1) { ret = SckBaseErr; perror("tcsetattr() err"); return ret; } return ret; } /** * listen_socket - 创建服务器监听套接字 * @fd:套接字 * @port:端口号 * 成功返回0,失败返回错误码 * */ int listen_socket(int fd, int port) { int ret = 0; if (port < 0 || port < 0 || port > 65535) { ret = SckParamErr; printf("listen_socket() params not correct !\n"); return ret; } //reuse addr int optval = 1; ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if (ret == -1) { ret = SckBaseErr; perror("setsockopt() err"); return ret; } //bind struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr("127.0.0.1"); ret = bind(fd, (struct sockaddr *) &addr, sizeof(addr)); if (ret == -1) { ret = SckBaseErr; perror("bind() err"); return ret; } //listen ret = listen(fd, SOMAXCONN); if (ret == -1) { ret = SckBaseErr; perror("listen() err"); return ret; } return ret; } /** * product_clt - 处理客户端信息 * @fd:客户端A文件描述符 * @cltpool:客户端套接字池 * @maxindex:连接池最后一个元素的下标 * 成功返回0,失败返回错误码 * */ int product_clt(int fd, int *cltpool, int maxindex) { int ret = 0; //接收客户端信息 Packet pack; memset(&pack, 0, sizeof(pack)); int buflen = 0; ret = recv_packet(fd, &pack, &buflen, 0); if (ret != 0) return ret; //转发给其他客户端 int i = 0; for (i = 0; i <= maxindex; i++) { if (cltpool[i] != -1 && cltpool[i] != fd) { //发送信息 ret = send_packet(cltpool[i], &pack, buflen+4, 0); if (ret != 0) return ret; } } return ret; } /** * handler - 信号捕捉函数 * @sign:信号值 * */ void handler(int sign) { if (sign == SIGPIPE) { printf("accept SIGPIPE!\n"); } } /** * select_socket - select机制管理客户端连接 * @fd:文件描述符 * 失败返回错误码 * */ int select_socket(int fd) { int ret = 0; //安装信号 if (signal(SIGPIPE, handler) == SIG_ERR) { ret = SckBaseErr; printf("signal() failed !\n"); return ret; } //定义客户端套接字临时变量 int conn = 0; struct sockaddr_in peeraddr; socklen_t peerlen = 0; //已经处理的select事件 int nread = 0; //创建客户端连接池 int cltpool[FD_SETSIZE] = { 0 }; //初始化连接池 int i = 0; for (i = 0; i < FD_SETSIZE; i++) { cltpool[i] = -1; } //定义数组尾部元素下标 int maxindex = 0; //定义最大的套接字(初始值是监听套接字) int maxfd = fd; //定义最新的套接字集合 fd_set allsets; FD_ZERO(&allsets); //定义需要监听的套接字集合 fd_set readfds; FD_ZERO(&readfds); //将监听套接字加入最新的套接字集合 FD_SET(fd, &allsets); while (1) { //将最新的套接字集合赋值给需要监听的套接字集合 readfds = allsets; do { nread = select(maxfd + 1, &readfds, NULL, NULL, NULL); } while (nread == -1 && errno == EINTR); //屏蔽信号 if (nread == -1) { ret = SckBaseErr; perror("select() err"); return ret; } //1.服务器监听套接字处理 if (FD_ISSET(fd, &readfds)) { //接收到客户端的连接 memset(&peeraddr, 0, sizeof(peeraddr)); peerlen = sizeof(peeraddr); conn = accept(fd, (struct sockaddr *) &peeraddr, &peerlen); if (conn == -1) { ret = SckBaseErr; perror("accept() err"); return ret; } //将客户端连接添加到连接池 for (i = 0; i < FD_SETSIZE; i++) { if (cltpool[i] == -1) { if (i > maxindex) { maxindex = i; } cltpool[i] = conn; break; } } if (i == FD_SETSIZE) { ret = SckBaseErr; close(conn); printf("客户端连接池已满!\n"); return ret; } if (conn > maxfd) maxfd = conn; //将该客户端套接字加入到最新套接字集合 FD_SET(conn, &allsets); printf("server accept from :%s\n", inet_ntoa(peeraddr.sin_addr)); if (--nread <= 0) continue; } //处理客户端请求 if (nread <= 0) continue; for (i = 0; i <= maxindex; i++) { if (cltpool[i] == -1) continue; if (FD_ISSET(cltpool[i], &readfds)) { //处理客户端请求 ret = product_clt(cltpool[i], cltpool, maxindex); if (ret != 0) { //从最新的套接字集合中删除 FD_CLR(cltpool[i], &allsets); //处理请求失败,关闭客户端连接 close(cltpool[i]); //从客户端连接池中清除 cltpool[i] = -1; break; } if (--nread <= 0) break; } } } return ret; }
时间: 2024-09-28 08:20:30