使用tcp socket进行跨进程/网络通信

看了陈硕的书,说虽然有pipe, msgget, message queue, unix domain socket,  还是建议进程间只用tcp socket来通信。

pipe的缺点是阻塞。msgget缺点是不能select。mq_send可以,但是双向通信要开两个mq。unix domain不能跨网络。tcp socket优点很多,就是处理分包比较麻烦些,不过可以抽象出来。根据我的项目需要,自己设计的数据封包格式为:

MSG -- 3 Bytes
cmd --  1 Byte
ulen -- 4 Bytes
clen -- 4 Bytes
url          --- ulen Bytes
content  -- clen Bytes

陈硕在最后加了adler32 checksum, 我为了简便没有;为了便于找到包的开始位置,我参考了mpeg pes的sync word概念,引入"MSG"作为magic number标识。

附代码:

/*IPC using tcp socket 

test:
 gcc -g -Wall stream_buffer.c -DTEST_MSG_BUFFER
 ./a.out
 ./a.out 127.0.0.1

*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdint.h>

#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <poll.h>

#ifndef AV_RB32
#define AV_RB32(x)                                    (((uint32_t)((const uint8_t*)(x))[0] << 24) |                   (((const uint8_t*)(x))[1] << 16) |                   (((const uint8_t*)(x))[2] <<  8) |                    ((const uint8_t*)(x))[3])
#endif
#ifndef AV_WB32
#define AV_WB32(p, darg) do {                        unsigned d = (darg);                            ((uint8_t*)(p))[3] = (d);                       ((uint8_t*)(p))[2] = (d)>>8;                    ((uint8_t*)(p))[1] = (d)>>16;                   ((uint8_t*)(p))[0] = (d)>>24;               } while(0)
#endif

typedef struct{
	int cmd;
	int ulen, clen;
	uint8_t *url, *content;
}ctrl_msg_t;

typedef struct{
	int rpos, wpos;
	int msize;
	uint8_t *buf;
}StreamBuffer;

static int ctrl_fd, serv_fd;
static StreamBuffer *sb_in, *sb_out;

StreamBuffer* sb_init(int msize)
{
	StreamBuffer *s = NULL;

	if(msize < 1){
		return NULL;
	}

	s = malloc(sizeof(*s));
	if(!s){
		return NULL;
	}

	s->rpos = s->wpos = 0;
	s->msize = msize;
	s->buf = malloc(s->msize);
	if(!s->buf){
		free(s);
		return NULL;
	}

	return s;
}

int sb_destroy(StreamBuffer *s)
{
	if(s && s->buf){
		free(s->buf);
	}
	if(s){
		free(s);
	}
	return 0;
}

int sb_write(StreamBuffer *s, uint8_t *data, int len)
{/*return < 0 means fail.*/
	int size;
	if(!s || !data || len < 0){
		return -1;
	}

	if(s->msize - s->wpos >= len){
		memcpy(s->buf + s->wpos, data, len);
		s->wpos += len;
	}else if (s->msize - s->wpos + s->rpos >= len){
		size = s->wpos - s->rpos;
		memmove(s->buf, s->buf+s->rpos, size);
		s->rpos = 0;
		s->wpos = size;
		memcpy(s->buf+s->wpos, data, len);
		s->wpos += len;
	}else{
		printf("sb buf full\n");
		return -1;
	}

	return 0;
}

int sb_read(StreamBuffer *s, uint8_t *data, int len)
{/*read actual read bytes.*/
	int size;
	if(!s || !data || len < 0){
		return 0;
	}

	size = s->wpos - s->rpos;
	if(size > len){
		size = len;
	}
	memcpy(data, s->buf+s->rpos, size);
	s->rpos += size;

	return size;
}

static int ctl_msg_cb(ctrl_msg_t *msg)
{
	printf("%s: %d '%s' '%s'\n", (serv_fd ? "Server" : "Client"), msg->cmd, msg->url, msg->content);
	return 0;
}

int ctl_msg_open(int server_fd)
{
    struct sockaddr_in from_addr;
    socklen_t len;
    int fd;

    len = sizeof(from_addr);
	memset(&from_addr, 0, len);
    fd = accept(server_fd, (struct sockaddr *)&from_addr, &len);
    if (fd < 0) {
        printf("error setup during accept %s\n", strerror(errno));
        return -1;
    }
	printf("new conn %s:%u\n", inet_ntoa(from_addr.sin_addr), ntohs(from_addr.sin_port));

    if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0){
        printf("set non-block failed\n");
    }
	ctrl_fd = fd;
	return fd;
}

int ctl_msg_recv(void)
{
	int len;
	uint8_t buf[1024];

	while( (len = recv(ctrl_fd, buf, sizeof(buf), 0)) > 0){
		sb_write(sb_in, buf, len);
	}

	return 0;
}

int ctl_msg_send(void)
{
	int len;
	uint8_t *ptr = NULL;
	StreamBuffer *sb = sb_out;

	if(!sb){
		return -1;
	}

	len = sb->wpos - sb->rpos;
	if(len <= 0){
		return 1;
	}
	ptr = sb->buf + sb->rpos;

	len = send(ctrl_fd, ptr, len, 0);
	if(len > 0){
		sb->rpos += len;
	}

	return 0;
}

int ff_ctl_open(unsigned short port)
{
    int fd, tmp;
	struct sockaddr_in addr;

	memset(&addr, 0, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);

    fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0) {
        perror ("socket");
        return -1;
    }

    tmp = 1;
    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(tmp)))
        printf("setsockopt SO_REUSEADDR failed\n");

    if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0){
        printf("cant bind\n");
        close(fd);
        return -1;
    }

    if(listen(fd, 5) < 0){
        perror ("listen");
        close(fd);
        return -1;
    }

    if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0){
        printf("set non block failed\n");
    }

	serv_fd = fd;
	sb_in = sb_init(8096);
	sb_out = sb_init(8096);
    return fd;
}

int ff_ctl_open2(char *ip, unsigned short port)
{
    int fd;
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(addr));

	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);

	if(inet_aton(ip, &addr.sin_addr) == 0){
        printf("bad ip '%s'\n", ip);
        return -1;
    }

    fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0) {
        perror ("socket");
        return -1;
    }

    if(connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0){
       printf("cant connect to ip '%s'\n", ip);
       return -1;
    } 

    if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0){
        printf("set non block failed\n");
    }

	ctrl_fd = fd;
	sb_in = sb_init(8096);
	sb_out = sb_init(8096);

    return fd;
}

int ff_ctl_recv(void)
{/*unpack sb_in and dispatch messages in it.*/
	StreamBuffer *sb = sb_in;
	uint8_t *ptr, *end;
	static ctrl_msg_t msg = {0};

	if(!sb){
		return -1;
	}

	ptr = sb->buf + sb->rpos;
	end = sb->buf + sb->wpos;
	while(ptr + 4 < end){
		if(msg.url)goto content;
		if(msg.clen)goto url;
		if(msg.ulen)goto clen;
		if(msg.cmd)goto ulen;

		if(memcmp(ptr, "MSG", 3)){
			ptr += 3;
			continue;
		}
		ptr += 3;

		msg.cmd = ptr[0];
		ptr += 1;
		if(ptr >= end)break;

		ulen:
		msg.ulen = AV_RB32(ptr);
		ptr += 4;
		if(ptr >= end)break;

		clen:
		msg.clen = AV_RB32(ptr);
		ptr += 4;
		if(ptr >= end)break;

		url:
		if(msg.ulen > 0 && ptr + msg.ulen <= end){
			msg.url = malloc(msg.ulen+1);
			memcpy(msg.url, ptr, msg.ulen);
			msg.url[msg.ulen] = 0;
			ptr += msg.ulen;
		}
		if(ptr >= end)break;

		content:
		if(msg.clen > 0 && ptr + msg.clen <= end){
			msg.content = malloc(msg.clen+1);
			memcpy(msg.content, ptr, msg.clen);
			msg.content[msg.clen] = 0;
			ptr += msg.clen;

			ctl_msg_cb(&msg);
			free(msg.url);
			free(msg.content);
			memset(&msg, 0, sizeof(msg));
		}
	}

	sb->rpos = ptr - sb->buf;
	return 0;
}

int ff_ctl_send(int cmd, uint8_t *url, uint8_t *content)
{
	int ulen, clen;
	uint8_t *ptr, buf[64];
	StreamBuffer *sb = sb_out;

	if(!sb || !(0 <= cmd && cmd <= 9)){
		return -1;
	}

	ulen = strlen((char*)url);
	clen = strlen((char*)content);

	ptr = buf;
	ptr += sprintf((char*)ptr, "MSG");
	*ptr = (uint8_t)cmd;
	ptr += 1;
	AV_WB32(ptr, ulen);
	ptr += 4;
	AV_WB32(ptr, clen);
	ptr += 4;

	sb_write(sb, buf, ptr-buf);
	sb_write(sb, url, ulen);
	sb_write(sb, content, clen);
	return 0;
}

#if defined(TEST_MSG_BUFFER)
static int strip(char *str)
{
	int n = strlen(str);
	while(n > 0 && (str[n-1] == '\r' || str[n-1] == '\n')){
		str[--n] = 0;
	}
	return n;
}

int main(int ac, char **av)
{
	int ret, fd, is_server = 1;
	int peer_fd = 0;

	if(ac != 2){
		printf("start server\n");
	}else{
		is_server = 0;
	}

	if(is_server){
		fd = ff_ctl_open(5678);
	}else{
		fd = ff_ctl_open2(av[1], 5678);
	}

	struct pollfd *entry, table[8] = {{0}};

	for(;;) {
		entry = table;
		if(fd){
			entry->fd = fd;
	        entry->events = POLLIN|POLLOUT;
	        entry++;
		}

		if(peer_fd){
			entry = table;
			entry->fd = peer_fd;
        	entry->events = POLLIN|POLLOUT;
        	entry++;
		}

		do {
            ret = poll(table, entry - table, 1000);
        } while (ret < 0);

		for(entry = table; entry->fd; ++entry){
			if(entry->revents & POLLIN){
				if(is_server && entry->fd == fd && peer_fd <= 0){
					peer_fd = ctl_msg_open(fd);
				}else{
					ctl_msg_recv();
					ff_ctl_recv();
				}
			}else if(entry->revents & POLLOUT){
				char line[128] = "";

				printf("> ");
	     		fflush(stdout);
				fgets(line, sizeof(line), stdin);
				strip(line);

				ff_ctl_send(2, (uint8_t*)"cmd", (uint8_t*)line);

				ctl_msg_send();
			}
		}

	}
}
#endif

服务端和客户端都会阻塞在fgets,敲了回车后才显示对方的消息。可以修改为开线程输入。不过我最终的应用场景不是处理这个,我自测试的目的达到了。

时间: 2024-10-24 12:59:54

使用tcp socket进行跨进程/网络通信的相关文章

Android IPC机制(五)用Socket实现跨进程聊天程序

相关文章: Android IPC机制(一)开启多进程 Android IPC机制(二)用Messenger进行进程间通信 Android IPC机制(三)在Android Studio中使用AIDL实现跨进程方法调用 Android IPC机制(四)用ContentProvider进行进程间通信 1.Socket简介 Socket也称作"套接字",是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信.它分为流式套接字和

linux 单机跨进程通信

一般来说通过网络通信(比如tcp,udp)或者共享内存的方式肯定可以实现跨进程通信,但现在这里要说的是比较偏但实用的几个方法:利用unix域通信(普通网络连接),利用unix域通信(socketpair通信),以及pipe方式. 一. 利用unix域通信(普通网络连接) socket API原本是为网络通讯设计的,但后来在socket的框架上发展出一种IPC机制,就是UNIX Domain Socket.虽然网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.

Go语言TCP Socket编程

Golang的主要 设计目标之一就是面向大规模后端服务程序,网络通信这块是服务端 程序必不可少也是至关重要的一部分.在日常应用中,我们也可以看到Go中的net以及其subdirectories下的包均是"高频+刚需",而TCP socket则是网络编程的主流,即便您没有直接使用到net中有关TCP Socket方面的接口,但net/http总是用到了吧,http底层依旧是用tcp socket实现的. 网络编程方面,我们最常用的就是tcp socket编程了,在posix标准出来后,s

以中间件,路由,跨进程事件的姿势使用WebSocket

通过参考koa中间件,socket.io远程事件调用,以一种新的姿势来使用WebSocket. 浏览器端 浏览器端使用WebSocket很简单 // Create WebSocket connection. const socket = new WebSocket('ws://localhost:8080'); // Connection opened socket.addEventListener('open', function (event) { socket.send('Hello Se

Nginx 中 fastcgi_pass 监听端口 unix socket和tcp socket差

Nginx 中 fastcgi_pass 监听端口 unix socket和tcp socket差别 Nginx连接fastcgi的方式有2种:unix domain socket和TCP,Unix domain socket 或者 IPC socket是一种终端,可以使同一台操作系统上的两个或多个进程进行数据通信.与管道相比,Unix domain sockets 既可以使用字节流和数据队列,而管道通信则只能通过字节流.Unix domain sockets的接口和Internet socke

Python socket进阶 多线程/进程

xSocket语法及相关 Socket Families(地址簇) socket.AF_UNIX unix本机进程间通信  socket.AF_INET IPV4 socket.AF_INET6  IPV6 上面的这些内容代表地址簇,创建socket必须指定,默认为IPV4 Socket Types socket.SOCK_STREAM  #for tcp socket.SOCK_DGRAM   #for udp  socket.SOCK_RAW     #原始套接字,普通的套接字无法处理ICM

Robotium通过广播与服务+shell命令实现跨进程自动化测试之广播与服务

还记得当初刚学robotium的时候,有人提起过这个方法,当时觉得很牛逼,暂时不想接触(其实就是对比较深入,难以快速理解的知识的畏惧).现在到了不得不去了解的时候,也只能硬着头皮上了... 先来讲一下我现在对这个方法的理解:整个Android程序,我们可以看成是一个个的实验室(进程)组成的,我们使用robotium就转属于其中一个实验室,实验室的规定非常的严格,不允许人员进入其他的实验室.但是人总是要有些消息或者有些活动需要别的实验室配合(跨进程)..这个时候我们只有一种办法,就是通过中转人员(

Wayland中的跨进程过程调用浅析

原文地址:http://blog.csdn.net/jinzhuojun/article/details/40264449 Wayland协议主要提供了Client端应用与Server端Compositor的通信机制,Weston是Server端Compositor的一个参考实现.Wayland协议中最基础的是提供了一种面向对象的跨进程过程调用的功能,在作用上类似于Android中的Binder.与Binder不同的是,在Wayland中Client和Server底层通过domain socke

python基于mysql实现的简单队列以及跨进程锁

在我们做多进程应用开发的过程中,难免会遇到多个进程访问同一个资源(临界资源)的状况,必须通过加一个全局性的锁,来实现资源的同步访问(同一时间只能有一个进程访问资源). 举个例子: 假设我们用mysql来实现一个任务队列,实现的过程如下: 1. 在Mysql中创建Job表,用于储存队列任务,如下: create table jobs( id auto_increment not null primary key, message text not null, job_status not null