3、netty粘包、拆包(二)

上篇博客留了个尾巴“而且LineBasedFrameDecoder据说还有一种不要求携带结束符的解码方式”,今天就从源码来看看是怎么回事。

一、基本原理

如果没有netty,用户自行拆包,原理是:

netty的原理也是如此。

LineBasedFrameDecoder的基础结构如下图:

二、ByteToMessageDecoder 

ByteToMessageDecoder这个类就是解码的基础类,这个类内部有个累加器,读数据过程中不断累加不断判断是否是一个完整的数据包。该类定义了两个累加器,默认使用MEGER_CUMULATOR

1 public static final Cumulator MERGE_CUMULATOR(){...}
2 private Cumulator cumulator = MERGE_CUMULATOR;

还定义了一个ByteBuf 用来保存读取到的数据

1 ByteBuf cumulation;

接下来来看一下 MEGER_CUMULATOR是怎么将数据存入 cumulation中的

 1  public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
 2         @Override
 3         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
 4             ByteBuf buffer;
 5             if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
 6                     || cumulation.refCnt() > 1) {//容量不足,扩容
 7                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
 8             } else {
 9                 buffer = cumulation;
10             }
11             buffer.writeBytes(in);
12             in.release();
13             return buffer;
14         }
15     };

netty中ByteBuf的读写指针对于累加的实现相当简单,只需要调用 writeBytes(in)就可以了。在累加之前先要判断buffer的容量,如果不足就扩容。

扩容代码:

1  static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
2         ByteBuf oldCumulation = cumulation;
3         cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
4         cumulation.writeBytes(oldCumulation);
5         oldCumulation.release();
6         return cumulation;
7     }

ByteToMessageDecoder 继承自 ChannelInboundHandlerAdapter 那么每次从TCP缓存区中读取数据是都会触发 channelRead()。【AbstractNioByteChannel.read()】

 1 do {
 2     byteBuf = allocHandle.allocate(allocator);
 3     allocHandle.lastBytesRead(doReadBytes(byteBuf));
 4     if (allocHandle.lastBytesRead() <= 0) {
 5         // nothing was read. release the buffer.
 6         byteBuf.release();
 7         byteBuf = null;
 8         close = allocHandle.lastBytesRead() < 0;
 9         break;
10     }
11     allocHandle.incMessagesRead(1);
12     readPending = false;
13     pipeline.fireChannelRead(byteBuf);//触发点
14     byteBuf = null;
15 } while (allocHandle.continueReading());

channelRead的实现如下:

 1     @Override
 2     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 3         if (msg instanceof ByteBuf) {
 4             CodecOutputList out = CodecOutputList.newInstance();
 5             try {
 6                 ByteBuf data = (ByteBuf) msg;
 7                 first = cumulation == null;
 8                 if (first) {
 9                     cumulation = data;
10                 } else {
11                     cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);//累加数据
12                 }
13                 callDecode(ctx, cumulation, out);//拆包
14             } catch (DecoderException e) {
15                 throw e;
16             } catch (Throwable t) {
17                 throw new DecoderException(t);
18             } finally {
19                 if (cumulation != null && !cumulation.isReadable()) {//ByteBuf中没有可以读取的数据,释放ByteBuf
20                     numReads = 0;
21                     cumulation.release();
22                     cumulation = null;
23                 } else if (++ numReads >= discardAfterReads) {//读取了数据
24                     numReads = 0;
25                     discardSomeReadBytes();//将已经读取的数据丢弃
26                 }
27                 int size = out.size();//读取了几个数据包
28                 decodeWasNull = !out.insertSinceRecycled();//是否拆到一个数据包,insertSinceRecycled() = true表示out有增加或修改
29                 fireChannelRead(ctx, out, size);//将数据包传递给下一个handler
30                 out.recycle();
31             }
32         } else {
33             ctx.fireChannelRead(msg);
34         }
35     }

继续查看拆包[callDecode(ctx, cumulation, out)]的代码实现:

 1 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
 2     try {
 3         while (in.isReadable()) {
 4             int outSize = out.size();
 5             if (outSize > 0) {
 6                 fireChannelRead(ctx, out, outSize);//传递给下一个handler
 7                 out.clear();
 8                 if (ctx.isRemoved()) {
 9                     break;
10                 }
11                 outSize = 0;
12             }
13             int oldInputLength = in.readableBytes();
14             decode(ctx, in, out);//抽象方法,具体的子类实现
15             if (ctx.isRemoved()) {
16                 break;
17             }
18             if (outSize == out.size()) {
19                 if (oldInputLength == in.readableBytes()) {
20                     break;
21                 } else {
22                     continue;
23                 }
24             }
25             if (oldInputLength == in.readableBytes()) {
26                 throw new DecoderException(
27                         StringUtil.simpleClassName(getClass()) +
28                         ".decode() did not read anything but decoded a message.");
29             }
30             if (isSingleDecode()) {
31                 break;
32             }
33         }
34     } catch (DecoderException e) {
35         throw e;
36     } catch (Throwable cause) {
37         throw new DecoderException(cause);
38     }
39 }

while循环中遍历每一个字节,判断这个字节是不是结束符,如果是就放入out中,每拆出一个完整数据包就传递给下一个handler,这就解释了为什么channelRead()的28行要通过out的修改记录来判断是否成功拆包。

decode()方法由子类来实现。

三、LineBasedFrameDecoder

这里使用的是 LineBasedFrameDecoder,找到它的decode方法.

 1     @Override
 2     protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 3         Object decoded = decode(ctx, in);
 4         if (decoded != null) {
 5             out.add(decoded);
 6         }
 7     }
 8
 9 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
10     final int eol = findEndOfLine(buffer);//找到了分割符就返回索引,没有返回-1
11     if (!discarding) {//不丢弃数据
12         if (eol >= 0) {
13             final ByteBuf frame;
14             final int length = eol - buffer.readerIndex();//数据包的长度
15             final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1;//分割符的长度
16             if (length > maxLength) {
17                 buffer.readerIndex(eol + delimLength);
18                 fail(ctx, length);
19                 return null;
20             }
21             if (stripDelimiter) {//是否带分隔符
22                 frame = buffer.readRetainedSlice(length);//创建一个共享buffer对象的缓存区的一个子区域
23                 buffer.skipBytes(delimLength);
24             } else {
25                 frame = buffer.readRetainedSlice(length + delimLength);
26             }
27             return frame;//返回一个完整的数据包
28         } else {
29             final int length = buffer.readableBytes();
30             if (length > maxLength) {
31                 discardedBytes = length;
32                 buffer.readerIndex(buffer.writerIndex());
33                 discarding = true;//后面读取到的数据丢弃
34                 if (failFast) {
35                     fail(ctx, "over " + discardedBytes);
36                 }
37             }
38             return null;
39         }
40     } else {//丢弃掉这次读取的数据
41         if (eol >= 0) {
42             final int length = discardedBytes + eol - buffer.readerIndex();
43             final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1;
44             buffer.readerIndex(eol + delimLength);
45             discardedBytes = 0;
46             discarding = false;//丢完一个完整包后,下一个包就不再丢弃
47             if (!failFast) {
48                 fail(ctx, length);
49             }
50         } else {
51             discardedBytes += buffer.readableBytes();
52             buffer.readerIndex(buffer.writerIndex());
53         }
54         return null;
55     }
56 }

通过判断已有的数据中是否有"/n" 或者 "/r/n" 来作为一个完整数据包的依据,有则返回下标,没有返回-1

1     private static int findEndOfLine(final ByteBuf buffer) {
2         int i = buffer.forEachByte(ByteProcessor.FIND_LF);//FIND_LF=‘\n‘
3         if (i > 0 && buffer.getByte(i - 1) == ‘\r‘) {
4             i--;
5         }
6         return i;
7     }

接下来定义了一个变量 discarding 表示是否丢弃读取到的数据,默认为 false,只能在decode()方法中修改(简化后的代码)。

 1 private boolean discarding;
 2
 3 if (length > maxLength) {
 4     discarding = true;
 5 }
 6
 7 if (discarding == true) {
 8     if (eol >= 0) {
 9         discarding = false;
10     }
11 }

当已经读取的数据大于最大值时,表示这个包是异常包,这个包的所有数据都必须丢弃,从现在已经读取到的到下一个分割符。

接着往下看看到"stripDelimiter"表示数据包中是否应该取消分隔符,默认false。返回的数据包不包含分隔符。

1 private final boolean stripDelimiter;
2
3 public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
4     this.maxLength = maxLength;
5     this.failFast = failFast;
6     this.stripDelimiter = stripDelimiter;//whether the decoded frame should strip out the delimiter or not
7 }

顺着源码看到这里,LineBasedFrameDecoder的拆包逻辑已经很清晰了,与我们自己实现拆包大同小异。

回到最开始的那个尾巴,这个说法是错误的,LineBasedFrameDecoder已经默认有了分隔符且不能更改,能改变的只是返回的数据包有没有分隔符。

时间: 2024-10-07 23:51:55

3、netty粘包、拆包(二)的相关文章

Netty 粘包 &amp; 拆包 &amp; 编码 &amp; 解码 &amp; 序列化 介绍

目录: 粘包 & 拆包及解决方案 ByteToMessageDecoder 基于长度编解码器 基于分割符的编解码器 google 的 Protobuf 序列化介绍 其他的 前言 Netty 作为一个网络框架,对 TCP 连接中的问题都做了全面的考虑,比如粘包拆包导致的半包问题,如何编解码,如何实现私有协议,序列化等等.本文主要针对这些问题做一个简单介绍,目的是想对整个 Netty 的编解码框架做一个全盘的审视,以确保在后面的源码学习中不会一叶障目不见泰山. 1. 粘包 & 拆包及解决方案

【游戏开发】Netty TCP粘包/拆包问题的解决办法(二)

上一篇:[Netty4.X]Unity客户端与Netty服务器的网络通信(一) 一.什么是TCP粘包/拆包 如图所示,假如客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4中情况: 第一种情况:Server端分别读取到D1和D2,没有产生粘包和拆包的情况. 第二种情况:Server端一次接收到两个数据包,D1和D2粘合在一起,被称为TCP粘包. 第三种情况:Server端分2次读取到2个数据包,第一次读取到D1包和D2包的部分内容D2_1,第二次

Netty学习之TCP粘包/拆包

一.TCP粘包/拆包问题说明,如图 二.未考虑TCP粘包导致功能异常案例 按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间 1.服务端类 package com.phei.netty.s2016042302; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitial

Netty(三)TCP粘包拆包处理

tcp是一个“流”的协议,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. 粘包.拆包问题说明 假设客户端分别发送数据包D1和D2给服务端,由于服务端一次性读取到的字节数是不确定的,所以可能存在以下4种情况. 1.服务端分2次读取到了两个独立的包,分别是D1,D2,没有粘包和拆包: 2.服务端一次性接收了两个包,D1和D2粘在一起了,被成为TCP粘包; 3.服务端分2次读取到了两个数据包,第一次读取到了完整的D1和D2包的部

netty权威指南--------第四章TCP粘包/拆包问题

第三章中的示例用于功能测试一般没有问题,但当压力上来或者发送大报文时,就会存在粘包/拆包问题. 这时就需要使用LineBasedFrameDecoder+StringDecoder client端请求改为连续的100次 package com.xiaobing.netty.fourth; import java.net.SocketAddress; import org.omg.CORBA.Request; import io.netty.buffer.ByteBuf; import io.ne

netty解决tcp粘包拆包问题

tcp粘包拆包解决方案 1.发送定长的消息 server端:                    EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup)  .channel(NioServerSocketChannel.cl

Netty5入门学习笔记003-TCP粘包/拆包问题的解决之道(下)

TCP网络通信时候会发生粘包/拆包的问题,上节使用定长解码器解码,本次使用Netty提供的特殊分隔符解码器 还是用上节中的代码例子,但是只需要修改一下发送的消息和配置一下解码器就可以了 客户端发送消息中添加分隔符做为指令的结束符,模拟多条指令粘包发出 服务器配置分隔符解码器使用&符号拆包 运行结果: 服务器使用分隔符解码器成功拆包. 当然还有更复杂的自定义协议处理TCP粘包/拆包问题,后续深入学习后在进行讨论. 史上最高性价比PS教程-敬伟Photoshop经典教程

TCP粘包/拆包问题

无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制. TCP粘包/拆包 TCP是个"流"协议,所谓流,就是没有界限的一串数据.大家可以想想河里的流水,是连成一片的,其间并没有分界线.TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. TCP粘包/拆包问题说明 假设客户

TCP 粘包/拆包问题

简介 TCP 是一个’流’协议,所谓流,就是没有界限的一串数据. 大家可以想想河里的流水,是连成一片的.期间并没有分界线, TCP 底层并不了解上层业务数据的具体含义 ,它会根据 TCP 缓冲区的实际情况进行包得划分,所以在业务上认为,一个完整的包可能会被 TCP 拆分成多个包进行发送 . 也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的 TCP 拆包和粘包. TCP 粘包/拆包问题说明 我们可以通过图解对 TCP 粘包和拆包进行说明.粘包问题示例图: 假设客户端分别发送了两个数据包

Netty的TCP粘包/拆包(源码二)

假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况: 1.服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包. 2.服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包. 3.服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包. 4.服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1