netty UnpooledHeapByteBuf 源码分析

UnpooledHeapByteBuf 是基于堆内存进行内存分配的字节缓冲区,没有基于对象池技术实现,这意味着每次I/O的读写都会创建一个新的UnpooledHeapByteBuf,频繁进行大块内存的分配和回收对性能会造成一定的影响,但是对比与堆外内存的申请和释放,它的成本会低一些。

相对与PooledHeapByteBuf,UnpooledHeapByteBuf 的实现原理更加简单,也不容易出现内存管理方面的问题,在满足性能的情况下,尽量使用UnpooledHeapByteBuf 。

1.成员变量

private final ByteBufAllocator alloc;//聚合一个ByteBufAllocator,用于UnpooledHeapByteBuf的内存分配
private byte[] array;//byte 数组作为缓冲区
private ByteBuffer tmpNioBuf;//用于实现Netty ByteBuf 到 JDK ByteBuffer 的转换

事实上,如果使用JDK 的ByteBuffer替换byte数组也是可行的 ,直接使用byte数组的根本原因是提升性能和更加便捷的进行位操作。JDK 的ByteBuffer底层实现也是byte数组,如下。

public abstract class ByteBuffer
    extends Buffer
    implements Comparable<ByteBuffer>{

    // These fields are declared here rather than in Heap-X-Buffer in order to
    // reduce the number of virtual method invocations needed to access these
    // values, which is especially costly when coding small buffers.
    //
    final byte[] hb;                  // Non-null only for heap buffers
    final int offset;
    boolean isReadOnly;                 // Valid only for heap buffers

2.动态扩展缓冲区

@Override
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
/**
 * Should be called by every method that tries to access the buffers content to check
 * if the buffer was released before.
 */
protected final void ensureAccessible() {
    if (refCnt() == 0) {
        throw new IllegalReferenceCountException(0);
    }
}

方法 的入口首先对新容量进行合法性校验,如果大于容量上限或者小于0,则抛出IllegalArgumentException异常。

判断新的容量值是否大于当前的缓冲区容量,如果大于则需要动态扩展,通过 byte[] newArray = new byte[newCapacity];创建新的缓冲区字节数组,然后通过 System.arraycopy进行内存复制,将旧的字节数组复制到新创建的字节数组中,最后调用setArray(newArray);替换旧的字节数组。

private void setArray(byte[] initialArray) {
    array = initialArray;
    tmpNioBuf = null;
}

动态扩容完成后,需要将原来的视图tmpNioBuf设置为空。

如果新的容量小于当前缓冲区容量不需要动态扩展,但是需要截取当前缓冲区创建一个新的子缓冲区。先判断下读索引是否小于新的容量值,如果小于进一步判断写索引是否大于新的容量值,如果大于则将写索引设置为新的容量值(防止越界)。更新完写索引之后,通过 System.arraycopy将当前可读的字节数组复制到新创建的子缓冲区中, System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);

如果新的容量值小于读索引,说明没有可读的字节数组需要复制到新创建的缓冲区中,将读写索引为新的容量值即可。最后调用setArray方法替换原来的字节数组。

3.字节数组复制

@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}

首先做合法性校验。

protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
    checkIndex(index, length);
    if (srcIndex < 0 || srcIndex > srcCapacity - length) {
        throw new IndexOutOfBoundsException(String.format(
                "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
    }
}

校验index,length的值,如果小于0,抛出IllegalArgumentException异常,然后对两者之和进行判断,如果大于缓冲区的容量,则抛出IndexOutOfBoundsException异常。srcIndex和srcCapacity,与之类似。校验通过之后,调用 System.arraycopy(src, srcIndex, array, index, length)方法进行字节数组的复制。

protected final void checkIndex(int index, int fieldLength) {
    ensureAccessible();
    if (fieldLength < 0) {
        throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
    }
    if (index < 0 || index > capacity() - fieldLength) {
        throw new IndexOutOfBoundsException(String.format(
                "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
    }
}

注意: ButeBuf以set和get开头读写缓冲区的方法不会修改读写索引。

4.转换为JDK ByteBuffer

ByteBuffer 是基于byte数组实现,NIO的ByteBuffer提供wrap方法,可以将byte数组转换成ByteBuffer对象。

/**
     * Wraps a byte array into a buffer.
     *
     * <p> The new buffer will be backed by the given byte array;
     * that is, modifications to the buffer will cause the array to be modified
     * and vice versa.  The new buffer‘s capacity will be
     * <tt>array.length</tt>, its position will be <tt>offset</tt>, its limit
     * will be <tt>offset + length</tt>, and its mark will be undefined.  Its
     * {@link #array backing array} will be the given array, and
     * its {@link #arrayOffset array offset} will be zero.  </p>
     *
     * @param  array
     *         The array that will back the new buffer
     *
     * @param  offset
     *         The offset of the subarray to be used; must be non-negative and
     *         no larger than <tt>array.length</tt>.  The new buffer‘s position
     *         will be set to this value.
     *
     * @param  length
     *         The length of the subarray to be used;
     *         must be non-negative and no larger than
     *         <tt>array.length - offset</tt>.
     *         The new buffer‘s limit will be set to <tt>offset + length</tt>.
     *
     * @return  The new byte buffer
     *
     * @throws  IndexOutOfBoundsException
     *          If the preconditions on the <tt>offset</tt> and <tt>length</tt>
     *          parameters do not hold
     */
    public static ByteBuffer wrap(byte[] array,
                                    int offset, int length)
    {
        try {
            return new HeapByteBuffer(array, offset, length);
        } catch (IllegalArgumentException x) {
            throw new IndexOutOfBoundsException();
        }
    }

UnpooledHeapByteBuf.java

@Override
public ByteBuffer nioBuffer(int index, int length) {
    ensureAccessible();
    return ByteBuffer.wrap(array, index, length).slice();
}

调用了ByteBuffer的slice方法,由于每次调用nioBuffer都会创建一个新的ByteBuffer,因此此处的slice方法起不到重用缓冲区的效果,只能保证读写索引的独立性。

5.与子类相关的方法

isDirect方法:如果基础堆内存实现的ByteBuf,返回false,

@Override
public boolean isDirect() {
    return false;
}

hasArray方法:因为UnpooledHeapByteBuf是基于字节数组实现,返回true

@Override
public boolean hasArray() {
    return true;
}

array方法:因为UnpooledHeapByteBuf是基于字节数组实现,所以返回值是内部的字节数组成员变量。调用array方法之前,可以先使用hasArray方法进行判断,如果返回false说明当前ByteBuf不支持array方法。

@Override
public byte[] array() {
    ensureAccessible();
    return array;
}

原文地址:https://www.cnblogs.com/zwb1234/p/9577757.html

时间: 2024-10-15 15:55:25

netty UnpooledHeapByteBuf 源码分析的相关文章

Netty源码分析第5章(ByteBuf)----&gt;第3节: 内存分配器

Netty源码分析第五章: ByteBuf 第三节: 内存分配器 内存分配器, 顾明思议就是分配内存的工具, 在netty中, 内存分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关内存分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByteBuf一样, AbstractByteBufAllocator也实现了缓冲区分配的骨架逻辑, 剩余的交给其子类 以其中的分配

netty(六) buffer 源码分析

问题 : netty的 ByteBuff 和传统的ByteBuff的区别是什么? HeapByteBuf 和 DirectByteBuf 的区别 ? HeapByteBuf : 使用堆内存,缺点 ,socket 传输的时候由于需要复制的原因,慢一点 DirectByteBuf : 堆外内存,可以使用零拷贝 概述 netty ByteBuf 存在两个指针,分成三个区域: 已读区(可丢弃),未读区(未读),可写区 .不像之前JDK 的 ByteBuffer 中只有一个position 指针.例如以下

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

netty源码分析

1.Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序.也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用.Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发. 2.目前netty有3个版本netty3.netty4.netty5.3个版

Netty 心跳服务之 IdleStateHandler 源码分析

前言:Netty 提供的心跳介绍 Netty 作为一个网络框架,提供了诸多功能,比如我们之前说的编解码,Netty 准备很多现成的编解码器,同时,Netty 还为我们准备了网络中,非常重要的一个服务-----心跳机制.通过心跳检查对方是否有效,这在 RPC 框架中是必不可少的功能. Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 检测连接的有效性.当然,你也可以自己写个任务.但我们今天不准备使用自定义任务,而是

Netty源码分析第2章(NioEventLoop)----&gt;第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop 第七节:处理IO事件 上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑: 回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case Sele

Netty源码分析第2章(NioEventLoop)----&gt;第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE