看了陈硕的书,说虽然有pipe, msgget, message queue, unix domain socket, 还是建议进程间只用tcp socket来通信。
pipe的缺点是阻塞。msgget缺点是不能select。mq_send可以,但是双向通信要开两个mq。unix domain不能跨网络。tcp socket优点很多,就是处理分包比较麻烦些,不过可以抽象出来。根据我的项目需要,自己设计的数据封包格式为:
MSG -- 3 Bytes cmd -- 1 Byte ulen -- 4 Bytes clen -- 4 Bytes url --- ulen Bytes content -- clen Bytes
陈硕在最后加了adler32 checksum, 我为了简便没有;为了便于找到包的开始位置,我参考了mpeg pes的sync word概念,引入"MSG"作为magic number标识。
附代码:
/*IPC using tcp socket test: gcc -g -Wall stream_buffer.c -DTEST_MSG_BUFFER ./a.out ./a.out 127.0.0.1 */ #include <stdio.h> #include <string.h> #include <stdlib.h> #include <errno.h> #include <stdint.h> #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <arpa/inet.h> #include <poll.h> #ifndef AV_RB32 #define AV_RB32(x) (((uint32_t)((const uint8_t*)(x))[0] << 24) | (((const uint8_t*)(x))[1] << 16) | (((const uint8_t*)(x))[2] << 8) | ((const uint8_t*)(x))[3]) #endif #ifndef AV_WB32 #define AV_WB32(p, darg) do { unsigned d = (darg); ((uint8_t*)(p))[3] = (d); ((uint8_t*)(p))[2] = (d)>>8; ((uint8_t*)(p))[1] = (d)>>16; ((uint8_t*)(p))[0] = (d)>>24; } while(0) #endif typedef struct{ int cmd; int ulen, clen; uint8_t *url, *content; }ctrl_msg_t; typedef struct{ int rpos, wpos; int msize; uint8_t *buf; }StreamBuffer; static int ctrl_fd, serv_fd; static StreamBuffer *sb_in, *sb_out; StreamBuffer* sb_init(int msize) { StreamBuffer *s = NULL; if(msize < 1){ return NULL; } s = malloc(sizeof(*s)); if(!s){ return NULL; } s->rpos = s->wpos = 0; s->msize = msize; s->buf = malloc(s->msize); if(!s->buf){ free(s); return NULL; } return s; } int sb_destroy(StreamBuffer *s) { if(s && s->buf){ free(s->buf); } if(s){ free(s); } return 0; } int sb_write(StreamBuffer *s, uint8_t *data, int len) {/*return < 0 means fail.*/ int size; if(!s || !data || len < 0){ return -1; } if(s->msize - s->wpos >= len){ memcpy(s->buf + s->wpos, data, len); s->wpos += len; }else if (s->msize - s->wpos + s->rpos >= len){ size = s->wpos - s->rpos; memmove(s->buf, s->buf+s->rpos, size); s->rpos = 0; s->wpos = size; memcpy(s->buf+s->wpos, data, len); s->wpos += len; }else{ printf("sb buf full\n"); return -1; } return 0; } int sb_read(StreamBuffer *s, uint8_t *data, int len) {/*read actual read bytes.*/ int size; if(!s || !data || len < 0){ return 0; } size = s->wpos - s->rpos; if(size > len){ size = len; } memcpy(data, s->buf+s->rpos, size); s->rpos += size; return size; } static int ctl_msg_cb(ctrl_msg_t *msg) { printf("%s: %d '%s' '%s'\n", (serv_fd ? "Server" : "Client"), msg->cmd, msg->url, msg->content); return 0; } int ctl_msg_open(int server_fd) { struct sockaddr_in from_addr; socklen_t len; int fd; len = sizeof(from_addr); memset(&from_addr, 0, len); fd = accept(server_fd, (struct sockaddr *)&from_addr, &len); if (fd < 0) { printf("error setup during accept %s\n", strerror(errno)); return -1; } printf("new conn %s:%u\n", inet_ntoa(from_addr.sin_addr), ntohs(from_addr.sin_port)); if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0){ printf("set non-block failed\n"); } ctrl_fd = fd; return fd; } int ctl_msg_recv(void) { int len; uint8_t buf[1024]; while( (len = recv(ctrl_fd, buf, sizeof(buf), 0)) > 0){ sb_write(sb_in, buf, len); } return 0; } int ctl_msg_send(void) { int len; uint8_t *ptr = NULL; StreamBuffer *sb = sb_out; if(!sb){ return -1; } len = sb->wpos - sb->rpos; if(len <= 0){ return 1; } ptr = sb->buf + sb->rpos; len = send(ctrl_fd, ptr, len, 0); if(len > 0){ sb->rpos += len; } return 0; } int ff_ctl_open(unsigned short port) { int fd, tmp; struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { perror ("socket"); return -1; } tmp = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(tmp))) printf("setsockopt SO_REUSEADDR failed\n"); if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0){ printf("cant bind\n"); close(fd); return -1; } if(listen(fd, 5) < 0){ perror ("listen"); close(fd); return -1; } if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0){ printf("set non block failed\n"); } serv_fd = fd; sb_in = sb_init(8096); sb_out = sb_init(8096); return fd; } int ff_ctl_open2(char *ip, unsigned short port) { int fd; struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); if(inet_aton(ip, &addr.sin_addr) == 0){ printf("bad ip '%s'\n", ip); return -1; } fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { perror ("socket"); return -1; } if(connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0){ printf("cant connect to ip '%s'\n", ip); return -1; } if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0){ printf("set non block failed\n"); } ctrl_fd = fd; sb_in = sb_init(8096); sb_out = sb_init(8096); return fd; } int ff_ctl_recv(void) {/*unpack sb_in and dispatch messages in it.*/ StreamBuffer *sb = sb_in; uint8_t *ptr, *end; static ctrl_msg_t msg = {0}; if(!sb){ return -1; } ptr = sb->buf + sb->rpos; end = sb->buf + sb->wpos; while(ptr + 4 < end){ if(msg.url)goto content; if(msg.clen)goto url; if(msg.ulen)goto clen; if(msg.cmd)goto ulen; if(memcmp(ptr, "MSG", 3)){ ptr += 3; continue; } ptr += 3; msg.cmd = ptr[0]; ptr += 1; if(ptr >= end)break; ulen: msg.ulen = AV_RB32(ptr); ptr += 4; if(ptr >= end)break; clen: msg.clen = AV_RB32(ptr); ptr += 4; if(ptr >= end)break; url: if(msg.ulen > 0 && ptr + msg.ulen <= end){ msg.url = malloc(msg.ulen+1); memcpy(msg.url, ptr, msg.ulen); msg.url[msg.ulen] = 0; ptr += msg.ulen; } if(ptr >= end)break; content: if(msg.clen > 0 && ptr + msg.clen <= end){ msg.content = malloc(msg.clen+1); memcpy(msg.content, ptr, msg.clen); msg.content[msg.clen] = 0; ptr += msg.clen; ctl_msg_cb(&msg); free(msg.url); free(msg.content); memset(&msg, 0, sizeof(msg)); } } sb->rpos = ptr - sb->buf; return 0; } int ff_ctl_send(int cmd, uint8_t *url, uint8_t *content) { int ulen, clen; uint8_t *ptr, buf[64]; StreamBuffer *sb = sb_out; if(!sb || !(0 <= cmd && cmd <= 9)){ return -1; } ulen = strlen((char*)url); clen = strlen((char*)content); ptr = buf; ptr += sprintf((char*)ptr, "MSG"); *ptr = (uint8_t)cmd; ptr += 1; AV_WB32(ptr, ulen); ptr += 4; AV_WB32(ptr, clen); ptr += 4; sb_write(sb, buf, ptr-buf); sb_write(sb, url, ulen); sb_write(sb, content, clen); return 0; } #if defined(TEST_MSG_BUFFER) static int strip(char *str) { int n = strlen(str); while(n > 0 && (str[n-1] == '\r' || str[n-1] == '\n')){ str[--n] = 0; } return n; } int main(int ac, char **av) { int ret, fd, is_server = 1; int peer_fd = 0; if(ac != 2){ printf("start server\n"); }else{ is_server = 0; } if(is_server){ fd = ff_ctl_open(5678); }else{ fd = ff_ctl_open2(av[1], 5678); } struct pollfd *entry, table[8] = {{0}}; for(;;) { entry = table; if(fd){ entry->fd = fd; entry->events = POLLIN|POLLOUT; entry++; } if(peer_fd){ entry = table; entry->fd = peer_fd; entry->events = POLLIN|POLLOUT; entry++; } do { ret = poll(table, entry - table, 1000); } while (ret < 0); for(entry = table; entry->fd; ++entry){ if(entry->revents & POLLIN){ if(is_server && entry->fd == fd && peer_fd <= 0){ peer_fd = ctl_msg_open(fd); }else{ ctl_msg_recv(); ff_ctl_recv(); } }else if(entry->revents & POLLOUT){ char line[128] = ""; printf("> "); fflush(stdout); fgets(line, sizeof(line), stdin); strip(line); ff_ctl_send(2, (uint8_t*)"cmd", (uint8_t*)line); ctl_msg_send(); } } } } #endif
服务端和客户端都会阻塞在fgets,敲了回车后才显示对方的消息。可以修改为开线程输入。不过我最终的应用场景不是处理这个,我自测试的目的达到了。
时间: 2024-10-24 12:59:54