Mina、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息

在TCP连接开始到结束连接,之间可能会多次传输数据,也就是服务器和客户端之间可能会在连接过程中互相传输多条消息。理想状况是一方每发送一条消息,另一方就立即接收到一条,也就是一次write对应一次read。但是,现实不总是按照剧本来走。

MINA官方文档节选:

TCP guarantess delivery of all packets in the correct order. But there is no guarantee that one write operation on the sender-side will result in one read event on the receiving side. One call of IoSession.write(Object message) by the sender can result in multiple messageReceived(IoSession session, Object message) events on the receiver; and multiple calls of IoSession.write(Object message) can lead to a single messageReceived event.

Netty官方文档节选:

In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote.

上面两段话表达的意思相同:TCP是基于字节流的协议,它只能保证一方发送和另一方接收到的数据的字节顺序一致,但是,并不能保证一方每发送一条消息,另一方就能完整的接收到一条信息。有可能发送了两条对方将其合并成一条,也有可能发送了一条对方将其拆分成两条。所以在上一篇博文中的Demo,可以说是一个错误的示范。不过服务器和客户端在同一台机器上或者在局域网等网速很好的情况下,这种问题还是很难测试出来。

举个简单了例子(这个例子来源于Netty官方文档):

消息发送方发送了三个字符串:

但是接收方收到的可能是这样的:

那么问题就很严重了,接收方没法分开这三条信息了,也就没法解析了。

对此,MINA的官方文档提供了以下几种解决方案:

1、use fixed length messages

使用固定长度的消息。比如每个长度4字节,那么接收的时候按每条4字节拆分就可以了。

2、use a fixed length header that indicates the length of the body

使用固定长度的Header,Header中指定Body的长度(字节数),将信息的内容放在Body中。例如Header中指定的Body长度是100字节,那么Header之后的100字节就是Body,也就是信息的内容,100字节的Body后面就是下一条信息的Header了。

3、using a delimiter; for example many text-based protocols append a newline (or CR LF pair) after every message

使用分隔符。例如许多文本内容的协议会在每条消息后面加上换行符(CR LF,即"\r\n"),也就是一行一条消息。当然也可以用其他特殊符号作为分隔符,例如逗号、分号等等。

当然除了上面说到的3种方案,还有其他方案。有的协议也可能会同时用到上面多种方案。例如HTTP协议,Header部分用的是CR LF换行来区分每一条Header,而Header中用Content-Length来指定Body字节数。

下面,分别用MINA、Netty、Twisted自带的相关API实现按换行符CR LF来分割消息。

MINA:

MINA可以使用ProtocolCodecFilter来对发送和接收的二进制数据进行加工,如何加工取决于ProtocolCodecFactory或ProtocolEncoder、ProtocolDecoder,加工后在IoHandler中messageReceived事件函数获取的message就不再是IoBuffer了,而是你想要的其他类型,可以是字符串,Java对象。这里可以使用TextLineCodecFactory(ProtocolCodecFactory的一个实现类)实现CR LF分割消息。

public class TcpServer {  

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor();  

        // 添加一个Filter,用于接收、发送的内容按照"\r\n"分割
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));  

        acceptor.setHandler(new TcpServerHandle());
        acceptor.bind(new InetSocketAddress(8080));
    }  

}  

class TcpServerHandle extends IoHandlerAdapter {  

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        cause.printStackTrace();
    }  

    // 接收到新的数据
    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {  

        // 接收客户端的数据,这里接收到的不再是IoBuffer类型,而是字符串
        String line = (String) message;
        System.out.println("messageReceived:" + line);  

    }  

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("sessionCreated");
    }  

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("sessionClosed");
    }
}  

Netty:

Netty设计上和MINA类似,需要在ChannelPipeline加上一些ChannelHandler用来对原始数据进行处理。这里用LineBasedFrameDecoder将接收到的数据按行分割,StringDecoder再将数据由字节码转成字符串。同样,接收到的数据进过加工后,在channelRead事件函数中,msg参数不再是ByteBuf而是String。

public class TcpServer {  

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();  

                            // LineBasedFrameDecoder按行分割消息
                            pipeline.addLast(new LineBasedFrameDecoder(80));
                            // 再按UTF-8编码转成字符串
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));  

                            pipeline.addLast(new TcpServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }  

}  

class TcpServerHandler extends ChannelInboundHandlerAdapter {  

    // 接收到新的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {  

        // msg经过StringDecoder后类型不再是ByteBuf而是String
        String line = (String) msg;
        System.out.println("channelRead:" + line);
    }  

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("channelActive");
    }  

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("channelInactive");
    }  

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}  

Twisted:

Twisted的设计和上面两者的设计不太一样,所以实现消息分割也不太一样。处理事件的类TcpServerHandle不再继承Protocol,而是继承Protocol的子类LineOnlyReceiver。接收到新数据的事件方法也不再是dataReceived,而是LineOnlyReceiver提供的lineReceived。看Twisted源码的话可以发现LineOnlyReceiver的内部实际上自己实现了dataReceived,然后将其按行分割,有新的一行数据就调用lineReceived。

# -*- coding:utf-8 –*-  

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor  

class TcpServerHandle(LineOnlyReceiver):  

    # 新的连接建立
    def connectionMade(self):
        print ‘connectionMade‘  

    # 连接断开
    def connectionLost(self, reason):
        print ‘connectionLost‘  

    # 接收到新的一行数据
    def lineReceived(self, data):
        print ‘lineReceived:‘ + data  

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()  

下面用一个Java客户端对三个服务器进行测试:

public class TcpClient {  

    public static void main(String[] args) throws IOException {  

        Socket socket = null;
        OutputStream out = null;  

        try {  

            socket = new Socket("localhost", 8080);
            out = socket.getOutputStream();  

            // 请求服务器
            String lines = "床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";
            byte[] outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();  

        } finally {
            // 关闭连接
            out.close();
            socket.close();
        }
    }
}  

MINA服务器输出结果:

sessionCreated
messageReceived:床前明月光
messageReceived:疑是地上霜
messageReceived:举头望明月
messageReceived:低头思故乡
sessionClosed

Netty服务器输出结果:

channelActive
channelRead:床前明月光
channelRead:疑是地上霜
channelRead:举头望明月
channelRead:低头思故乡
channelInactive

Twisted服务器输出结果:

connectionMade
lineReceived:床前明月光
lineReceived:疑是地上霜
lineReceived:举头望明月
lineReceived:低头思故乡
connectionLost

当然,测试的时候也可以将发送的数据模拟成不按规则分割的情况,下面用一个更变态的客户端来测试:

public class TcpClient {  

    public static void main(String[] args) throws IOException, InterruptedException {  

        Socket socket = null;
        OutputStream out = null;  

        try{  

            socket = new Socket("localhost", 8080);
            out = socket.getOutputStream();  

            String lines = "床前";
            byte[] outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();  

            Thread.sleep(1000);  

            lines = "明月";
            outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();  

            Thread.sleep(1000);  

            lines = "光\r\n疑是地上霜\r\n举头";
            outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();  

            Thread.sleep(1000);  

            lines = "望明月\r\n低头思故乡\r\n";
            outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();  

        } finally {
            // 关闭连接
            out.close();
            socket.close();
        }
    }
}  

再次分别测试上面三个服务器,结果和上面的输出结果一样,没有任何问题。

时间: 2024-10-13 18:25:15

Mina、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息的相关文章

Mina、Netty、Twisted一起学(五):整合protobuf

protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化.反序列化),一般应用于网络传输,可支持多种编程语言. protobuf怎样使用这里不再介绍,本文主要介绍在MINA.Netty.Twisted中怎样使用protobuf,不了解protobuf的同学能够去參考我的还有一篇博文. 在前面的一篇博文中.有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息切割方式.在这里相同要使用到. 仅仅是当中Body的内容不再是字

Mina、Netty、Twisted一起学(十):线程模型

要想开发一个高性能的TCPserver,熟悉所使用框架的线程模型非常重要.MINA.Netty.Twisted本身都是高性能的网络框架,假设再搭配上高效率的代码.才干实现一个高大上的server. 可是假设不了解它们的线程模型.就非常难写出高性能的代码.框架本身效率再高.程序写的太差,那么server总体的性能也不会太高.就像一个电脑,CPU再好.内存小硬盘慢散热差,总体的性能也不会太高. 玩过Android开发的同学会知道,在Android应用中有一个非常重要线程:UI线程(即主线程). UI

Mina、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)

消息传递有非常多种方式.请求/响应(Request/Reply)是最经常使用的.在前面的博文的样例中.非常多都是採用请求/响应的方式.当server接收到消息后,会马上write回写一条消息到client. HTTP协议也是基于请求/响应的方式. 可是请求/响应并不能满足全部的消息传递的需求,有些需求可能须要服务端主动推送消息到client,而不是被动的等待请求后再给出响应. 公布/订阅(Publish/Subscribe)是一种server主动发送消息到client的消息传递方式.订阅者Sub

Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

在上一篇博文中,有介绍到用换行符分割消息的方法.但是这种方法有个小问题,如果消息中本身就包含换行符,那将会将这条消息分割成两条,结果就不对了. 本文介绍另外一种消息分割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息. 上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,例如12即为

Mina、Netty、Twisted一起学:实现简单的TCP服务器

MINA.Netty.Twisted为什么放在一起学习?首先,不妨先看一下他们官方网站对其的介绍: MINA: Apache MINA is a network application framework which helps users develop high performance and high scalability network applications easily. It provides an abstract event-driven asynchronous API

Mina、Netty、Twisted一起学(六):session

开发过Web应用的同学应该都会使用session.由于HTTP协议本身是无状态的,所以一个客户端多次访问这个web应用的多个页面,服务器无法判断多次访问的客户端是否是同一个客户端.有了session就可以设置一些和客户端相关的属性,用于保持这种连接状态.例如用户登录系统后,设置session标记这个客户端已登录,那么访问别的页面时就不用再次登录了. 不过本文的内容不是Web应用的session,而是TCP连接的session,实际上二者还是有很大区别的.Web应用的session实现方式并不是基

一起学Netty(七)之 TCP粘包拆包基本解决方案

上个小节我们浅析了在Netty的使用的时候TCP的粘包和拆包的现象,Netty对此问题提供了相对比较丰富的解决方案 Netty提供了几个常用的解码器,帮助我们解决这些问题,其实上述的粘包和拆包的问题,归根结底的解决方案就是发送端给远程端一个标记,告诉远程端,每个信息的结束标志是什么,这样,远程端获取到数据后,根据跟发送端约束的标志,将接收的信息分切或者合并成我们需要的信息,这样我们就可以获取到正确的信息了 例如,我们刚才的例子中,我们可以在发送的信息中,加一个结束标志,例如两个远程端规定以行来切

Netty中文用户手册(二)

第一章. 开始 这一章节将围绕Netty的核心结构展开,同时通过一些简单的例子可以让你更快的了解Netty的使用.当你读完本章,你将有能力使用Netty完成客户端和服务端的开发. 如果你更喜欢自上而下式的学习方式,你可以首先完成 第二章:架构总览 的学习,然后再回到这里. 1.1. 开始之前 运行本章示例程序的两个最低要求是:最新版本的Netty程序以及JDK 1.5或更高版本.最新版本的Netty程序可在项目下载页 下载.下载正确版本的JDK,请到你偏好的JDK站点下载. 这就已经足够了吗?实

Netty系列(四)TCP拆包和粘包

Netty系列(四)TCP拆包和粘包 一.拆包和粘包问题 (1) 一个小的Socket Buffer问题 在基于流的传输里比如 TCP/IP,接收到的数据会先被存储到一个 socket 接收缓冲里.不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列.即使你发送了 2 个独立的数据包,操作系统也不会作为 2 个消息处理而仅仅是作为一连串的字节而言.因此这是不能保证你远程写入的数据就会准确地读取.举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了 3 个数据包: 由于基于流传输