Mina、Netty、Twisted一起学习(三):TCP前缀固定大小的消息(Header)

以前的博文于,有介绍切割消息换行的方法。

但是有一个小问题,这样的方法,设消息中本身就包括换行符,那将会将这条消息切割成两条。结果就不正确了。

本文介绍第二种消息切割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body。用一个固定字节数的Header前缀来指定Body的字节数,以此来切割消息。

上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,比如12即为0x0000000C。这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。

以下分别用MINA、Netty、Twisted来实现对这样的消息的切合和解码。

MINA:

MINA提供了PrefixedStringCodecFactory来对这样的类型的消息进行编码解码。PrefixedStringCodecFactory默认Header的大小是4字节。当然也能够指定成1或2。

public class TcpServer {

	public static void main(String[] args) throws IOException {
		IoAcceptor acceptor = new NioSocketAcceptor();

		// 4字节的Header指定Body的字节数。对这样的消息的处理
		acceptor.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));

		acceptor.setHandler(new TcpServerHandle());
		acceptor.bind(new InetSocketAddress(8080));
	}

}

class TcpServerHandle extends IoHandlerAdapter {

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		cause.printStackTrace();
	}

	// 接收到新的数据
	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {

		String msg = (String) message;
		System.out.println("messageReceived:" + msg);

	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		System.out.println("sessionCreated");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		System.out.println("sessionClosed");
	}
}

Netty:

Netty使用LengthFieldBasedFrameDecoder来处理这样的消息。

以下代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包括5个參数,各自是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度。lengthFieldOffset为Header的位置。lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度。并不包括Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的所有内容。这里设为4表示去掉4字节的Header。仅仅留下Body)。

public class TcpServer {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel ch)
								throws Exception {
							ChannelPipeline pipeline = ch.pipeline();

							// LengthFieldBasedFrameDecoder按行切割消息,取出body
							pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
							// 再按UTF-8编码转成字符串
							pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

							pipeline.addLast(new TcpServerHandler());
						}
					});
			ChannelFuture f = b.bind(8080).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

	// 接收到新的数据
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {

		String message = (String) msg;
		System.out.println("channelRead:" + message);
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		System.out.println("channelActive");
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) {
		System.out.println("channelInactive");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

Twisted:

在Twisted中须要继承Int32StringReceiver,不再继承Protocol。

Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而须要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived。而是stringReceived。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(Int32StringReceiver):

    # 新的连接建立
    def connectionMade(self):
        print ‘connectionMade‘

    # 连接断开
    def connectionLost(self, reason):
        print ‘connectionLost‘

    # 接收到新的数据
    def stringReceived(self, data):
        print ‘stringReceived:‘ + data

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

以下是Java编写的一个client測试程序:

public class TcpClient {

	public static void main(String[] args) throws IOException {

		Socket socket = null;
		DataOutputStream out = null;

		try {

			socket = new Socket("localhost", 8080);
			out = new DataOutputStream(socket.getOutputStream());

			// 请求server
			String data1 = "牛顿";
			byte[] outputBytes1 = data1.getBytes("UTF-8");
			out.writeInt(outputBytes1.length); // write header
			out.write(outputBytes1); // write body

			String data2 = "爱因斯坦";
			byte[] outputBytes2 = data2.getBytes("UTF-8");
			out.writeInt(outputBytes2.length); // write header
			out.write(outputBytes2); // write body

			out.flush();

		} finally {
			// 关闭连接
			out.close();
			socket.close();
		}

	}

}

MINAserver输出结果:

sessionCreated
messageReceived:牛顿
messageReceived:爱因斯坦
sessionClosed

Nettyserver输出结果:

channelActive
channelRead:牛顿
channelRead:爱因斯坦
channelInactive

Twistedserver输出结果:

connectionMade
stringReceived:牛顿
stringReceived:爱因斯坦
connectionLost

作者:叉叉哥   转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/38752105

版权声明:本文博主原创文章,博客,未经同意不得转载。

时间: 2024-08-06 20:02:28

Mina、Netty、Twisted一起学习(三):TCP前缀固定大小的消息(Header)的相关文章

Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

在上一篇博文中,有介绍到用换行符分割消息的方法.但是这种方法有个小问题,如果消息中本身就包含换行符,那将会将这条消息分割成两条,结果就不对了. 本文介绍另外一种消息分割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息. 上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,例如12即为

Jetty学习三:配置概览-需要配置什么

上一节讲述了怎么配置Jetty,这节将告诉你使用Jetty你需要配置些什么. 配置Server Server实例是Jetty服务端的中心协调对象,它为所有其他Jetty服务端组件提供服务和生命周期管理.在标准Jetty发布中,核心的服务端配置是在etc/jetty.xml文件中,你也能在其中包含其他服务端配置,可以包括: 1)ThreadPool Server实例提供了一个线程池,你可以在etc/jetty.xml中配置最大线程数和最小线程数. 2)Handlers Jetty服务端只能有一个H

Beaglebone Back学习三(开发环境搭建)

开发环境搭建 1 Ubuntu环境搭建 2 Window环境搭建 3 开发板环境搭建 1 Ubuntu环境搭建 (1)安装必要的网络工具 samba nfs tftp vmware-tools samba nfs apt-get install nfs-kernel-server vim  /etc/exports  (/path/to/nfs  *(rw,sync,no_root_squash) ) /etc/init.d/nfs-kernel-server restart (2)配置通信网络

【挨踢人物传】马永亮:感悟学习三境界 引领马哥教育的崛起(第19期)

[编者有话]        本期的嘉宾马永亮,一次误以为是"擅长"的选择,开始结缘计算机,然而当真正接触后才发现犹如"井底之蛙",此前的擅长根本不值一提,从天堂到地狱的落差,没有挫败他的信心和追求,反而激起了他更加强烈的求知欲望,在IT的道路上不断的成长感悟-- [本期人物档案] 个人信息: 51CTO账号:马哥教育 姓名:马永亮 性别:男 所在地:河南郑州 教育信息:研究生 关键词:马哥教育创办人 Linux系统运维专家 51CTO专家博主 51CTO学院签约讲师

MongoDB入门学习(三):MongoDB的增删查改

对于我们这种菜鸟来说,最重要的不是数据库的管理,也不是数据库的性能,更不是数据库的扩展,而是怎么用好这款数据库,也就是一个数据库提供的最核心的功能,增删查改. 因为MongoDB存储数据都是以文档的模式,所以在操作它的数据时,也是以文档为单位的.那么我们实现增删查改也是以文档为基础,不知道文档是什么的同学可以看看上篇介绍的基本概念. 1.插入文档 向MongoDB集合中插入文档的基本方法是insert: 单个插入 > document = {key : value} > db.collecti

Netty系列(四)TCP拆包和粘包

Netty系列(四)TCP拆包和粘包 一.拆包和粘包问题 (1) 一个小的Socket Buffer问题 在基于流的传输里比如 TCP/IP,接收到的数据会先被存储到一个 socket 接收缓冲里.不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列.即使你发送了 2 个独立的数据包,操作系统也不会作为 2 个消息处理而仅仅是作为一连串的字节而言.因此这是不能保证你远程写入的数据就会准确地读取.举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了 3 个数据包: 由于基于流传输

算法学习三阶段

?? 第一阶段:练经典经常使用算法,以下的每一个算法给我打上十到二十遍,同一时候自己精简代码, 由于太经常使用,所以要练到写时不用想,10-15分钟内打完,甚至关掉显示器都能够把程序打 出来. 1.最短路(Floyd.Dijstra,BellmanFord) 2.最小生成树(先写个prim,kruscal 要用并查集,不好写) 3.大数(高精度)加减乘除 4.二分查找. (代码可在五行以内) 5.叉乘.判线段相交.然后写个凸包. 6.BFS.DFS,同一时候熟练hash 表(要熟,要灵活,代码要

ZigBee学习三 UART通信

ZigBee学习三 UART通信 本实验只对coordinator.c文件进行改动就可以实现串口的收发. 修改coordinator.c文件 byte GenericApp_TransID; // This is the unique message ID (counter) afAddrType_t GenericApp_DstAddr; unsigned char uartbuf[128];/**************************************************

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t