netty(六) buffer 源码分析

问题 :

  • netty的 ByteBuff 和传统的ByteBuff的区别是什么?
  • HeapByteBuf 和 DirectByteBuf 的区别 ?
    HeapByteBuf : 使用堆内存,缺点 ,socket 传输的时候由于需要复制的原因,慢一点
    DirectByteBuf : 堆外内存,可以使用零拷贝

概述

netty ByteBuf 存在两个指针,分成三个区域: 已读区(可丢弃),未读区(未读),可写区 。不像之前JDK 的 ByteBuffer 中只有一个position 指针。例如以下示例 :
    public static void main(String[] args){
        ByteBuffer buffer = ByteBuffer.allocate(88);
        String value = "Netty~~";
        buffer.put(value.getBytes());
        //注意这个flip()方法,要是不调用,将读取到不正确的位置
        buffer.flip();
        byte[] vArray = new byte[buffer.remaining()];
        buffer.get(vArray);
        String result = new String(vArray);
        System.out.println(result);
    }
概述一下netty ByteBuff 的特点 :
  • 丰富API,存在readIndex 和 writeIndex 两个指针,方便读写
  • 动态扩容
  • 提供兼容 JDK ByteBuffer的方法

分类

内存池,循环利用创建的 ByteBuf 对象提升内存使用效率,降低由于高负载导致的频繁 GC .
PooledByteBuf 抽象类的子类 :
  • PooledDirectByteBuf
  • PooledHeapByteBuf
  • PooledUnsafeDirectByteBuf
    看一下类的结构图

源码分析

AbstractByteBuf 源码分析

    @Override
    public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
        checkReadableBytes(length);
        //抽象方法交由子类实现
        getBytes(readerIndex, dst, dstIndex, length);
        readerIndex += length;
        return this;
    }
看一下写操作
    @Override
    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        ensureWritable(length);
        setBytes(writerIndex, src, srcIndex, length);
        writerIndex += length;
        return this;
    }

    @Override
    public ByteBuf ensureWritable(int minWritableBytes) {
        if (minWritableBytes < 0) {
            throw new IllegalArgumentException(String.format(
                    "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
        }

        if (minWritableBytes <= writableBytes()) {
            return this;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

        // Adjust to the new capacity.
        capacity(newCapacity);
        return this;
    }

   /**
    * 计算新容量并没有一下子就增加一倍这样的简单思路,而是一点点地增加。
    *
    */
    private int calculateNewCapacity(int minNewCapacity) {
        final int maxCapacity = this.maxCapacity;
        final int threshold = 1048576 * 4; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }
 丢弃已读区域,复用缓冲区
    @Override
    public ByteBuf discardReadBytes() {
        ensureAccessible();
        if (readerIndex == 0) {
            return this;
        }

        if (readerIndex != writerIndex) {
        	//子类实现,字节数组进行复制,读写区域进行前移
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
            readerIndex = 0;
        } else {
            adjustMarkers(readerIndex);
            writerIndex = readerIndex = 0;
        }
        return this;
    }

    protected final void adjustMarkers(int decrement) {
        int markedReaderIndex = this.markedReaderIndex;
        if (markedReaderIndex <= decrement) {
            this.markedReaderIndex = 0;
            int markedWriterIndex = this.markedWriterIndex;
            if (markedWriterIndex <= decrement) {
                this.markedWriterIndex = 0;
            } else {
                this.markedWriterIndex = markedWriterIndex - decrement;
            }
        } else {
            this.markedReaderIndex = markedReaderIndex - decrement;
            markedWriterIndex -= decrement;
        }
    }

AbstractReferenceCountedByteBuf 源码分析

 从名字看出该类主要对引用进行计数,类似于JVM 内存回收的对象引用计数器,用于跟踪对象的分配和销毁,做自动内存回收。
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
	//利用原子类进行CAS 操作,保证了线程安全
    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

    private static final long REFCNT_FIELD_OFFSET;

    static {
        long refCntFieldOffset = -1;
        try {
            if (PlatformDependent.hasUnsafe()) {
                refCntFieldOffset = PlatformDependent.objectFieldOffset(
                        AbstractReferenceCountedByteBuf.class.getDeclaredField("refCnt"));
            }
        } catch (Throwable t) {
            // Ignored
        }

        REFCNT_FIELD_OFFSET = refCntFieldOffset;
    }

    @SuppressWarnings("FieldMayBeFinal")
    private volatile int refCnt = 1;

    @Override
    public final boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

    @Override
    public ByteBuf retain() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, 1);
            }
            if (refCnt == Integer.MAX_VALUE) {
                throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
            }
            //CAS 操作
            if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
                break;
            }
        }
        return this;
    }    

    ....
这个类有三个重要的字段,一个原子类用于多线程操作,保证线程安全。REFCNT_FIELD_OFFSET 是一个内存偏移量,用于标识 refCnt字段在AbstractReferenceCountedByteBuf这个类
的内存地址,最后一个refCnt 是用 volatile 修饰的变量,保存对象应用次数。

UnpooledHeapByteBuf 非内存池堆内存ByteBuf源码分析

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {

	//内存分配
    private final ByteBufAllocator alloc;
    //字节缓存区
    private byte[] array;
    //作用和 JDK ByteBuffer 的转化
    private ByteBuffer tmpNioBuf;

    ...

    private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
        ensureAccessible();
        ByteBuffer tmpBuf;
        //使用自身字段 tmpNioBuf 进行操作返回,所以 UnpooledHeapByteBuf 是在 JDK ByteBuff 的基础上进行扩展的。
        if (internal) {
            tmpBuf = internalNioBuffer();
        } else {
            tmpBuf = ByteBuffer.wrap(array);
        }
        return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
    }

    @Override
    public int readBytes(GatheringByteChannel out, int length) throws IOException {
        checkReadableBytes(length);
        int readBytes = getBytes(readerIndex, out, length, true);
        readerIndex += readBytes;
        return readBytes;
    }

总结

文章主要介绍netty buffer 相关的知识,主要是父类方法和 unpooled 相关的实现。

参考资料

  • 《netty 权威指南》

原文地址:https://www.cnblogs.com/Benjious/p/11634877.html

时间: 2024-11-06 14:37:24

netty(六) buffer 源码分析的相关文章

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务

Netty 4.0 源码分析(四):ByteBuf

Netty是基于流的消息传递机制.Netty框架中,所有消息的传输都依赖于ByteBuf接口,ByteBuf是Netty NIO框架中的缓冲区.ByteBuf接口可以理解为一般的Byte数组,不过Netty对Byte进行了封装,增加了一些实用的方法. ChannelBuf接口 package io.netty.buffer;public interface ChannelBuf {    ChannelBufType type();    boolean isPooled();} ByteBuf

六.jQuery源码分析之jQuery原型属性和方法

97 jQuery.fn = jQuery.prototype = { 98 constructor: jQuery, 99 init: function( selector, context, rootjQuery ) { }, 210 selector: "", 213 jquery: "1.7.2", 216 length: 0, 219 size: function() {}, 223 toArray: function() {}, 229 get: fun

Netty中的ChannelPipeline源码分析

ChannelPipeline在Netty中是用来处理请求的责任链,默认实现是DefaultChannelPipeline,其构造方法如下: 1 private final Channel channel; 2 private final ChannelFuture succeededFuture; 3 private final VoidChannelPromise voidPromise; 4 final AbstractChannelHandlerContext head; 5 final

SOFA 源码分析 —— 过滤器设计

前言 通常 Web 服务器在处理请求时,都会使用过滤器模式,无论是 Tomcat ,还是 Netty,过滤器的好处是能够将处理的流程进行分离和解耦,比如一个 Http 请求进入服务器,可能需要解析 http 报头,权限验证,国际化处理等等,过滤器可以很好的将这些过程隔离,并且,过滤器可以随时卸载,安装. 每个 Web 服务器的过滤器思想都是类似的,只是实现方式略有不同. 比如 Tomcat,Tomcat 使用了一个 FilterChain 对象保存了所有的 filter,通过循环所有 filte

Netty源码分析第7章(编码器和写数据)----&gt;第3节: 写buffer队列

Netty源码分析七章: 编码器和写数据 第三节: 写buffer队列 之前的小结我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, prom

Netty源码分析第7章(编码器和写数据)----&gt;第4节: 刷新buffer队列

Netty源码分析第七章: 编码器和写数据 第四节: 刷新buffer队列 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法

Netty源码分析第6章(解码器)----&gt;第3节: 行解码器

Netty源码分析第六章: 解码器 第三节: 行解码器 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 首先看其参数: //数据包的最大长度, 超过该长度会进行丢弃模式 private final int maxLength; //超出最大长度是否要抛出异常 private final boolean fail

Netty源码分析第6章(解码器)----&gt;第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,