上篇博客留了个尾巴“而且LineBasedFrameDecoder据说还有一种不要求携带结束符的解码方式”,今天就从源码来看看是怎么回事。
一、基本原理
如果没有netty,用户自行拆包,原理是:
netty的原理也是如此。
LineBasedFrameDecoder的基础结构如下图:
二、ByteToMessageDecoder
ByteToMessageDecoder这个类就是解码的基础类,这个类内部有个累加器,读数据过程中不断累加不断判断是否是一个完整的数据包。该类定义了两个累加器,默认使用MEGER_CUMULATOR
1 public static final Cumulator MERGE_CUMULATOR(){...} 2 private Cumulator cumulator = MERGE_CUMULATOR;
还定义了一个ByteBuf 用来保存读取到的数据
1 ByteBuf cumulation;
接下来来看一下 MEGER_CUMULATOR是怎么将数据存入 cumulation中的
1 public static final Cumulator MERGE_CUMULATOR = new Cumulator() { 2 @Override 3 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { 4 ByteBuf buffer; 5 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() 6 || cumulation.refCnt() > 1) {//容量不足,扩容 7 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); 8 } else { 9 buffer = cumulation; 10 } 11 buffer.writeBytes(in); 12 in.release(); 13 return buffer; 14 } 15 };
netty中ByteBuf的读写指针对于累加的实现相当简单,只需要调用 writeBytes(in)就可以了。在累加之前先要判断buffer的容量,如果不足就扩容。
扩容代码:
1 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { 2 ByteBuf oldCumulation = cumulation; 3 cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); 4 cumulation.writeBytes(oldCumulation); 5 oldCumulation.release(); 6 return cumulation; 7 }
ByteToMessageDecoder 继承自 ChannelInboundHandlerAdapter 那么每次从TCP缓存区中读取数据是都会触发 channelRead()。【AbstractNioByteChannel.read()】
1 do { 2 byteBuf = allocHandle.allocate(allocator); 3 allocHandle.lastBytesRead(doReadBytes(byteBuf)); 4 if (allocHandle.lastBytesRead() <= 0) { 5 // nothing was read. release the buffer. 6 byteBuf.release(); 7 byteBuf = null; 8 close = allocHandle.lastBytesRead() < 0; 9 break; 10 } 11 allocHandle.incMessagesRead(1); 12 readPending = false; 13 pipeline.fireChannelRead(byteBuf);//触发点 14 byteBuf = null; 15 } while (allocHandle.continueReading());
channelRead的实现如下:
1 @Override 2 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 3 if (msg instanceof ByteBuf) { 4 CodecOutputList out = CodecOutputList.newInstance(); 5 try { 6 ByteBuf data = (ByteBuf) msg; 7 first = cumulation == null; 8 if (first) { 9 cumulation = data; 10 } else { 11 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);//累加数据 12 } 13 callDecode(ctx, cumulation, out);//拆包 14 } catch (DecoderException e) { 15 throw e; 16 } catch (Throwable t) { 17 throw new DecoderException(t); 18 } finally { 19 if (cumulation != null && !cumulation.isReadable()) {//ByteBuf中没有可以读取的数据,释放ByteBuf 20 numReads = 0; 21 cumulation.release(); 22 cumulation = null; 23 } else if (++ numReads >= discardAfterReads) {//读取了数据 24 numReads = 0; 25 discardSomeReadBytes();//将已经读取的数据丢弃 26 } 27 int size = out.size();//读取了几个数据包 28 decodeWasNull = !out.insertSinceRecycled();//是否拆到一个数据包,insertSinceRecycled() = true表示out有增加或修改 29 fireChannelRead(ctx, out, size);//将数据包传递给下一个handler 30 out.recycle(); 31 } 32 } else { 33 ctx.fireChannelRead(msg); 34 } 35 }
继续查看拆包[callDecode(ctx, cumulation, out)]的代码实现:
1 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 2 try { 3 while (in.isReadable()) { 4 int outSize = out.size(); 5 if (outSize > 0) { 6 fireChannelRead(ctx, out, outSize);//传递给下一个handler 7 out.clear(); 8 if (ctx.isRemoved()) { 9 break; 10 } 11 outSize = 0; 12 } 13 int oldInputLength = in.readableBytes(); 14 decode(ctx, in, out);//抽象方法,具体的子类实现 15 if (ctx.isRemoved()) { 16 break; 17 } 18 if (outSize == out.size()) { 19 if (oldInputLength == in.readableBytes()) { 20 break; 21 } else { 22 continue; 23 } 24 } 25 if (oldInputLength == in.readableBytes()) { 26 throw new DecoderException( 27 StringUtil.simpleClassName(getClass()) + 28 ".decode() did not read anything but decoded a message."); 29 } 30 if (isSingleDecode()) { 31 break; 32 } 33 } 34 } catch (DecoderException e) { 35 throw e; 36 } catch (Throwable cause) { 37 throw new DecoderException(cause); 38 } 39 }
while循环中遍历每一个字节,判断这个字节是不是结束符,如果是就放入out中,每拆出一个完整数据包就传递给下一个handler,这就解释了为什么channelRead()的28行要通过out的修改记录来判断是否成功拆包。
decode()方法由子类来实现。
三、LineBasedFrameDecoder
这里使用的是 LineBasedFrameDecoder,找到它的decode方法.
1 @Override 2 protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 3 Object decoded = decode(ctx, in); 4 if (decoded != null) { 5 out.add(decoded); 6 } 7 } 8 9 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { 10 final int eol = findEndOfLine(buffer);//找到了分割符就返回索引,没有返回-1 11 if (!discarding) {//不丢弃数据 12 if (eol >= 0) { 13 final ByteBuf frame; 14 final int length = eol - buffer.readerIndex();//数据包的长度 15 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1;//分割符的长度 16 if (length > maxLength) { 17 buffer.readerIndex(eol + delimLength); 18 fail(ctx, length); 19 return null; 20 } 21 if (stripDelimiter) {//是否带分隔符 22 frame = buffer.readRetainedSlice(length);//创建一个共享buffer对象的缓存区的一个子区域 23 buffer.skipBytes(delimLength); 24 } else { 25 frame = buffer.readRetainedSlice(length + delimLength); 26 } 27 return frame;//返回一个完整的数据包 28 } else { 29 final int length = buffer.readableBytes(); 30 if (length > maxLength) { 31 discardedBytes = length; 32 buffer.readerIndex(buffer.writerIndex()); 33 discarding = true;//后面读取到的数据丢弃 34 if (failFast) { 35 fail(ctx, "over " + discardedBytes); 36 } 37 } 38 return null; 39 } 40 } else {//丢弃掉这次读取的数据 41 if (eol >= 0) { 42 final int length = discardedBytes + eol - buffer.readerIndex(); 43 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1; 44 buffer.readerIndex(eol + delimLength); 45 discardedBytes = 0; 46 discarding = false;//丢完一个完整包后,下一个包就不再丢弃 47 if (!failFast) { 48 fail(ctx, length); 49 } 50 } else { 51 discardedBytes += buffer.readableBytes(); 52 buffer.readerIndex(buffer.writerIndex()); 53 } 54 return null; 55 } 56 }
通过判断已有的数据中是否有"/n" 或者 "/r/n" 来作为一个完整数据包的依据,有则返回下标,没有返回-1
1 private static int findEndOfLine(final ByteBuf buffer) { 2 int i = buffer.forEachByte(ByteProcessor.FIND_LF);//FIND_LF=‘\n‘ 3 if (i > 0 && buffer.getByte(i - 1) == ‘\r‘) { 4 i--; 5 } 6 return i; 7 }
接下来定义了一个变量 discarding 表示是否丢弃读取到的数据,默认为 false,只能在decode()方法中修改(简化后的代码)。
1 private boolean discarding; 2 3 if (length > maxLength) { 4 discarding = true; 5 } 6 7 if (discarding == true) { 8 if (eol >= 0) { 9 discarding = false; 10 } 11 }
当已经读取的数据大于最大值时,表示这个包是异常包,这个包的所有数据都必须丢弃,从现在已经读取到的到下一个分割符。
接着往下看看到"stripDelimiter"表示数据包中是否应该取消分隔符,默认false。返回的数据包不包含分隔符。
1 private final boolean stripDelimiter; 2 3 public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) { 4 this.maxLength = maxLength; 5 this.failFast = failFast; 6 this.stripDelimiter = stripDelimiter;//whether the decoded frame should strip out the delimiter or not 7 }
顺着源码看到这里,LineBasedFrameDecoder的拆包逻辑已经很清晰了,与我们自己实现拆包大同小异。
回到最开始的那个尾巴,这个说法是错误的,LineBasedFrameDecoder已经默认有了分隔符且不能更改,能改变的只是返回的数据包有没有分隔符。