ByteToMessageDecoder在Netty中起着很大的作用,用来解决半包字节累积问题。粘贴部分重要代码(当然本身方法不是很
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { ByteBuf cumulation; private boolean singleDecode; private boolean first; protected ByteToMessageDecoder() { if (getClass().isAnnotationPresent(Sharable.class)) {//因为每一个ByteToMessageDecoder都有针对某个socket的累积对象 //故是一个不可以共享的对象类型 throw new IllegalStateException("@Sharable annotation is not allowed"); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { //缓冲区的大小没有超过需要写入的数据的大小 if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data);//将数据写入到积累对象中 data.release();//释放bytebuffer(heap或者direct)--通过引用的方式进行释放缓冲区 } //收集完毕之后解析收集到的字符串 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //如果累积对象中没有数据了(因为所有发送的数据刚刚好n个msg) if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0; //针对解析后的out结果,逐个调用message for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } } private void expandCumulation(ChannelHandlerContext ctx, int readable) { ByteBuf oldCumulation = cumulation;//新的容量=旧的容量+可读取的数量 ---在此处的扩展和初次的分配都是通过同一个allocator进行分配的 cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); cumulation.writeBytes(oldCumulation);//复制的过程 oldCumulation.release();//释放老的对象 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (cumulation != null && !first) {//cumulation可读的数据为0那么就不会调用紧跟着的代码段 cumulation.discardSomeReadBytes();//如果存在半包得话,那么就释放不必要的空间(有时间的话,我们会将一个Netty中ByteBuf的构造) } if (decodeWasNull) { decodeWasNull = false; if (!ctx.channel().config().isAutoRead()) { ctx.read(); } } ctx.fireChannelReadComplete(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance(); try {//如果channel不活动了得话,对累积对象进行解码。 if (cumulation != null) { callDecode(ctx, cumulation, out); decodeLast(ctx, cumulation, out); } else { decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null) { cumulation.release(); cumulation = null; } int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } ctx.fireChannelInactive(); out.recycle(); } } /** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out);//特别注意: 调用完如果msg被解析出来的话,那么累积对象的readableBytes一定会发生变化 // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) {//如果此handler被移除 break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } }
由此总结如下:
- 累积对象主要是承载socket接收的字节数的。每一次decode,当有msg生成的时候,readableBytes都会减小
- 当一个socket处于inactive时,会对累积对象的数据进行解析。然后释放累积对象
- 当当前的累积对象不能承载数据的时候,需要进行扩展(调用Allocator创建个新的bytebuf,然后copy一下数据)
- 该handler是一个有状态的handler,需要记忆每一个socket的发送的字节.然后再去decode成msg对象
时间: 2024-10-10 10:56:46