[netty4][netty-buffer]netty之池化buffer

PooledByteBufAllocator buffer分配

buffer分配的入口:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int)
netty实际应用时分配调用栈:

CLASS_NAME METHOD_NAME LINE_NUM
io/netty/buffer/PooledByteBufAllocator newDirectBuffer 339
io/netty/buffer/AbstractByteBufAllocator directBuffer 185
io/netty/buffer/AbstractByteBufAllocator directBuffer 176
io/netty/buffer/AbstractByteBufAllocator ioBuffer 139
io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle allocate 114
io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186
io/netty/channel/nio/NioEventLoop processSelectedKey 682
io/netty/channel/nio/NioEventLoop processSelectedKeysOptimized 628
io/netty/channel/nio/NioEventLoop processSelectedKeys 533
io/netty/channel/nio/NioEventLoop run 511
io/netty/util/concurrent/SingleThreadEventExecutor$5 run 956

测试case代码

package io.netty.buffer;

import org.junit.Assert;
public class PooledByteBufTest {

    public static void main(String[] args) {
          final PooledByteBufAllocator allocator = new PooledByteBufAllocator(
                    false,   // preferDirect
                    0,      // nHeapArena
                    1,      // nDirectArena
                    8192,   // pageSize
                    11,     // maxOrder
                    3,      // tinyCacheSize
                    3,      // smallCacheSize
                    3,      // normalCacheSize
                    true    // useCacheForAllThreads
                    );

            // create tiny buffer
            final ByteBuf b1 = allocator.directBuffer(24);
            // create small buffer
            final ByteBuf b2 = allocator.directBuffer(800);
            // create normal buffer
            final ByteBuf b3 = allocator.directBuffer(8192 * 2);

            Assert.assertNotNull(b1);
            Assert.assertNotNull(b2);
            Assert.assertNotNull(b3);

            // then release buffer to deallocated memory while threadlocal cache has been disabled
            // allocations counter value must equals deallocations counter value
            Assert.assertTrue(b1.release());
            Assert.assertTrue(b2.release());
            Assert.assertTrue(b3.release());
    }
}

PoolChunk

PoolChunk本身数据结构与设计思路参见PoolChunk注释:

/**
 * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
 *
 * Notation: The following terms are important to understand the code
 * > page  - a page is the smallest unit of memory chunk that can be allocated
 * page是chunk中能分配的最小单元
 * > chunk - a chunk is a collection of pages
 * 一个chunk中有一组page  1对多
 * > in this code chunkSize = 2^{maxOrder} * pageSize
 * 代码中  chunksize大小计算如上  maxOrder 是啥?
 *
 * To begin we allocate a byte array of size = chunkSize
 * Whenever a ByteBuf of given size needs to be created we search for the first position
 * in the byte array that has enough empty space to accommodate the requested size and
 * return a (long) handle that encodes this offset information, (this memory segment is then
 * marked as reserved so it is always used by exactly one ByteBuf and no more)
 * 首先,当需要创建给定大小的ByteBuf时,我们分配一个size=chunkSize的字节数组,
 * 在字节数组中搜索第一个有足够的空空间来容纳请求的大小的位置,
 * 并返回一个(长)句柄来编码该偏移量信息(然后将该内存段标记为保留,因此它总是仅由一个ByteBuf使用,不再使用)
 *
 * For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
 * This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
 * equals the next nearest power of 2
 * 为了简单起见,所有大小都按照PoolArena#normalizeCapacity方法进行规范化
 * 这确保当我们请求大小大于等于pageSize的内存段时,normalized容量等于下一个最接近的2的幂
 *
 * To search for the first offset in chunk that has at least requested size available we construct a
 * complete balanced binary tree and store it in an array (just like heaps) - memoryMap
 * 为了搜索块中至少有请求大小可用的第一个偏移量,我们构造了一个完整的平衡二叉树,并将其存储在一个数组(就像堆一样)-内存映射中
 *
 * The tree looks like this (the size of each node being mentioned in the parenthesis)
 * 树看起来是这样的(括号中提到的每个节点的大小)
 *
 * depth=0        1 node (chunkSize)
 * depth=1        2 nodes (chunkSize/2)
 * ..
 * ..
 * depth=d        2^d nodes (chunkSize/2^d)
 * ..
 * depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize)  pageSize 在最下一层  最顶层是chunksize 从上往下走,每过一层除以2
 *
 * depth=maxOrder is the last level and the leafs consist of pages
 *
 * With this tree available searching in chunkArray translates like this:
 * To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
 * which is unused 要分配大小为chunkSize/2^k的内存段,我们在高度k处搜索第一个未使用的节点(从左开始)。 嗯嗯
 *
 * Algorithm:
 * ----------
 * Encode the tree in memoryMap with the notation  用符号将树编码在内存中
 *   memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
 *   is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
 * 在以id为根的子树中,可自由分配的第一个节点在深度x(从深度=0开始计算),即在深度[深度id,x的深度]处,没有可自由分配的节点
 *
 *  As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
 * 当我们分配空闲节点时,我们更新存储在memoryMap中的值,以便维护属性
 *
 * Initialization -
 *   In the beginning we construct the memoryMap array by storing the depth of a node at each node
 * 首先,我们通过在每个节点上存储一个节点的深度来构造memoryMap数组
 *     i.e., memoryMap[id] = depth_of_id
 *
 * Observations:
 * -------------
 * 1) memoryMap[id] = depth_of_id  => it is free / unallocated
 * 2) memoryMap[id] > depth_of_id  => at least one of its child nodes is allocated, so we cannot allocate it, but
 *                                    some of its children can still be allocated based on their availability
 * 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
 *                                    is thus marked as unusable
 *
 * Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
 * ----------
 * 1) start at root (i.e., depth = 0 or id = 1)
 * 2) if memoryMap[1] > d => cannot be allocated from this chunk
 * 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
 * 4) else try in right subtree
 *
 * Algorithm: [allocateRun(size)]
 * ----------
 * 1) Compute d = log_2(chunkSize/size)
 * 2) Return allocateNode(d)
 *
 * Algorithm: [allocateSubpage(size)]
 * ----------
 * 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
 * 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
 *    note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
 *
 * Note:
 * -----
 * In the implementation for improving cache coherence,
 * we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
 *
 * memoryMap[id]= depth_of_id  is defined above
 * depthMap[id]= x  indicates that the first node which is free to be allocated is at depth x (from root)
 */
final class PoolChunk<T> implements PoolChunkMetric {

io.netty.buffer.PoolArena.findSubpagePoolHead(int) 算出page header在page table中的index,小的page在前面

// trace 库地址 jdbc:h2:/Users/simon/twice-cooked-pork/trace-data/基于netty4做的resetserver的一次http请求trace/tracer.data.h2db

PoolChunk要解决的问题有:

  1. 快速查找未分配的地方并分配
  2. 尽量不要有碎片,可以理解成尽量挨着紧凑的分配

整个chunk的结构如下:

                                                    +------+   chunksize 当L=11时,是16M
L=0                                                 |   0  |
                                   +----------------+------+------------------+
                                   |                                          |
                                   |                                          |
                                   |                                          |
                               +---v--+                                   +---v--+
L=1                            |   1  |                                   |   2  |
                        +------+------+------+                     +------+------+-------+
                        |                    |                     |                     |
                        |                    |                     |                     |
                        |                    |                     |                     |
                    +---v--+             +---v--+              +---v--+              +---v--+
L=2                 |   3  |             |   4  |              |   5  |              |   6  |
                 +--+------+-+         +-+------+--+        +--+------+--+         +-+------+--+
                 |           |         |           |        |            |         |           |
                 |           |         |           |        |            |         |           |
                 |           |         |           |        |            |         |           |
              +--v---+   +---v--+   +--v---+   +---v--+   +-v----+   +---v--+   +--v---+   +---v--+
L=3           |  7   |   |   8  |   |  9   |   |  10  |   |  11  |   |  12  |   |  13  |   |  14  |
              +------+   +------+   +------+   +------+   +------+   +------+   +------+   +------+
               8K大小即page size

是一个完全二叉树,树的层高可以自定义,目前限制在30层内,默认是11层。
最底层是真正的chunk描述,最底层每个叶子是一个paage,大小为8K。那么当层数是11层时,chunk的size是16M。因为11层的话,最下面一层叶子是2的11次方,再乘以8K正好是16MB。
这棵树中每个节点还对对应其相应的大小是否被分配。什么叫其相应的大小?是这样的,每一层代表需要分配的大小的档次。暂且用档次这个词吧。最上面是16MB档次,最下面是8K档次,从最上面开始往下走一层,档次就除以2。
每次申请内存时,netty会先对其做规格化,所谓规格化就是最接近申请内存值的2de整数次幂。比如我申请900byte,那么规格化后就是1K。在规格化后,netty会在树上标志 0 1 3 7被使用了。下次要再申请8K内存时就要避开这个路径了,只能是 0 1 3 8 了,因为7那边已经不够了。其他大小同理。所以树上的节点是为了标志是否被使用过,以使得内存碎片减少尽量靠左紧凑分配。 对于单page内的内存使用浪费问题,netty又做了一层位图结构使其得以利用。对于chunk对象的查找,netty还做了缓存机制,下面有讲。

作业

仔细调试 1K 2k 3K 8K 11K 内存的多次分配与回收。

PoolArena

PoolArena 这一层负责创建与维护PoolChunk,维护的方式是将用到的正在分配中的PoolChunk放到PoolChunkList这个列表中。
PoolChunkList是一个链是结构。
而且,PoolArena还按PoolChunk的使用量分别维护到相对应的PoolChunkList中。

// abstract class PoolArena<T> implements PoolArenaMetric {
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

这些PoolChunkList也是按使用量大小有序的链式的串在一起(参见PoolArena构造方法中初始化这些list字段的代码),当使用量达到本级别时,会加入到下一级别的list中,比如达到25%了,那么就会加到50%列表中了。(参见io.netty.buffer.PoolChunkList.add(PoolChunk))

void add(PoolChunk<T> chunk) {
    if (chunk.usage() >= maxUsage) {
        nextList.add(chunk);
        return;
    }
    add0(chunk);
}

PoolArenad的cache与Recycler对象池

PooledByteBuf依赖PoolThreadCache做了一层对PoolChunk的缓存,PoolThreadCache靠MemoryRegionCache实现缓存。MemoryRegionCache靠队列来实现对PoolChunk的缓存(参见下面代码1),MemoryRegionCache在buf释放时会调用其add接口将释放的PoolChunk对象通过io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry对象包装后加入(offer)到队列(参见下面堆栈1)。在io.netty.buffer.PoolThreadCache.MemoryRegionCache.allocate(PooledByteBuf, int)时再从队列中直接poll出来,达成cache的目的。优化还没有结束,包装PoolChunk用的Entry对象是通过Recycler对象池完成分配(获取)已释放的。对象是本质上一个通过FastThreadLocal的Stack的数据结构,分配对应出栈,释放对象入栈。具体参见下面代码2。
Recycler
是一个基于ThreadLocal结合stack玩起来的一个对象池数据结构,像上述这种就是PooledUnsafeDirectByteBuf的对象pool。回收的时候压栈,要用的时候出栈。
获取对象 io.netty.util.Recycler.get()
回收对象 io.netty.util.Recycler.DefaultHandle.recycle(Object)

代码1: 队列初始化

Queue<Entry<T>> queue = PlatformDependent.newFixedMpscQueue(this.size);

堆栈1:buf释放时会调用MemoryRegionCache add接口将释放的PoolChunk对象包装后入队:

Thread [main] (Suspended (breakpoint at line 393 in PoolThreadCache$MemoryRegionCache))
    PoolThreadCache$SubPageMemoryRegionCache<T>(PoolThreadCache$MemoryRegionCache<T>).add(PoolChunk<T>, ByteBuffer, long) line: 393
    PoolThreadCache.add(PoolArena<?>, PoolChunk, ByteBuffer, long, int, SizeClass) line: 209
    PoolArena$DirectArena(PoolArena<T>).free(PoolChunk<T>, ByteBuffer, long, int, PoolThreadCache) line: 273
    PooledUnsafeDirectByteBuf(PooledByteBuf<T>).deallocate() line: 171
    PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release0(int) line: 136
    PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release() line: 124
    PooledByteBufTest.main(String[]) line: 43   

代码2:Entry对象使用对象池

private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
    @SuppressWarnings("unchecked")
    @Override
    protected Entry newObject(Handle<Entry> handle) {
        return new Entry(handle);
    }
};

private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
    Entry entry = RECYCLER.get();
    entry.chunk = chunk;
    entry.nioBuffer = nioBuffer;
    entry.handle = handle;
    return entry;
}

@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }

    Stack<?> stack = this.stack;
    if (lastRecycledId != recycleId || stack == null) {
        throw new IllegalStateException("recycled already");
    }

    stack.push(this);
}

原文地址:https://www.cnblogs.com/simoncook/p/11967186.html

时间: 2024-10-20 03:36:37

[netty4][netty-buffer]netty之池化buffer的相关文章

netty io.netty.buffer简介

io.netty.util.ReferenceCounted 此接口代表一个引用计数的对象,此对象需要显示的释放. 当一个ReferenceCounted对象被实例化的时候,该对象的引用数量就是1,调用retain()方法会增加引用数量,调用 release() 方法会减少引用数量,如果引用数量减少到0,该对象就需要显示释放掉.访问释放掉的对象通常会导致访问冲突. 如果实现ReferenceCounted接口的对象是一个包含同样实现ReferenceCounted接口的对象的容器.当容器的引用数

缓存池扩展 (Buffer Pool Extension)实践

SQL Server 2014缓存池扩展 (Buffer Pool Extension)功能可以将缓存池扩展到较快的SSD存储上.为内存比较紧张的系统提供了新的扩展途径. Buffer Pool 扩展可以带来以下好处: 提高随机I/O的吞吐量 降低I/O延迟 提高单位时间内处理事务的吞吐量 显著地提高读性能 以软件方式为客户实现了类似于混合硬盘的效果. 缓存池扩展支持以下两种模式: CW:只向SSD写入干净页. DW:双重写,即同时向SSD和硬盘写入脏页. 下面2张图里,C: 干净页  D:脏页

Netty自带连接池的使用

一.类介绍1.ChannelPool——连接池接口 2.SimpleChannelPool——实现ChannelPool接口,简单的连接池实现 3.FixedChannelPool——继承SimpleChannelPool,有大小限制的连接池实现 4.ChannelPoolMap——管理host与连接池映射的接口 5.AbstractChannelPoolMap——抽象类,实现ChannelPoolMap接口 二.具体使用a.MyNettyPool——Netty自带连接池的用法 package

Mysql InnDB 缓存池(Buffer Pool)

InnoDB通过在内存中维护缓存池(Buffer Pool)来对数据和索引进行缓存,从而提高数据库性能.了解Mysql内存缓冲池的原理,并针对性地进行调优,能够最大化其带来的性能优势. 通常,在保证服务器上其他应用程序有足够内存的情况下,可以给buffer pool分配尽可能多的内存空间.越大的buffer pool能够使mysql越能够像内存数据库一样提供服务.其大小通过innodb_buffer_pool_size配置.对于64 bit 机器,可以将buffer pool 配置为多个部分,以

【netty】Netty系列之Netty百万级推送服务设计要点

1. 背景 1.1. 话题来源 最近很多从事移动互联网和物联网开发的同学给我发邮件或者微博私信我,咨询推送服务相关的问题.问题五花八门,在帮助大家答疑解惑的过程中,我也对问题进行了总结,大概可以归纳为如下几类: Netty是否可以做推送服务器? 如果使用Netty开发推送服务,一个服务器最多可以支撑多少个客户端? 使用Netty开发推送服务遇到的各种技术问题. 由于咨询者众多,关注点也比较集中,我希望通过本文的案例分析和对推送服务设计要点的总结,帮助大家在实际工作中少走弯路. 1.2. 推送服务

8.池化内存分配

netty内存管理思想 PooledByteBufAllocate PoolChunk Chunk初始化 PoolChunk分配内存 netty内存管理思想 java作为一门拥有GC机制的语言,长久以来它的使用者都不必手动管理内存,这比起c/c++是一个巨大的进步.但现在netty却反其道而行之,实现了一套不依赖GC而自行管理内存的机制. 那么netty为什么要这么做?众所周知netty是一个网络通信层框架,涉及到大量IO操作,对于此类操作,DirectByteBuffer无疑比起堆内存有更多性

Spatial pyramid pooling (SPP)-net (空间金字塔池化)笔记(转)

在学习r-cnn系列时,一直看到SPP-net的身影,许多有疑问的地方在这篇论文里找到了答案. 论文:Spatial Pyramid Pooling in Deep Convolutional Networks for Visual Recognition 转自:http://blog.csdn.net/xzzppp/article/details/51377731 另可参考:http://zhangliliang.com/2014/09/13/paper-note-sppnet/ http:/

高可用的池化 Thrift Client 实现(源码分享)

本文将分享一个高可用的池化 Thrift Client 及其源码实现,欢迎阅读源码(Github)并使用,同时欢迎提出宝贵的意见和建议,本人将持续完善. 本文的主要目标读者是对 Thrift 有一定了解并使用的童鞋,如对 Thrift 的基础知识了解不多或者想重温一下基础知识,推荐先阅读本站文章<和 Thrift 的一场美丽邂逅>. 下面进入正题. 为什么我们需要这么一个组件? 我们知道,Thrift 是一个 RPC 框架体系,可以非常方便的进行跨语言 RPC 服务的开发和调用.然而,它并没有

ufldl学习笔记与编程作业:Feature Extraction Using Convolution,Pooling(卷积和池化抽取特征)

ufldl出了新教程,感觉比之前的好,从基础讲起,系统清晰,又有编程实践. 在deep learning高质量群里面听一些前辈说,不必深究其他机器学习的算法,可以直接来学dl. 于是最近就开始搞这个了,教程加上matlab编程,就是完美啊. 新教程的地址是:http://ufldl.stanford.edu/tutorial/ 学习链接: http://ufldl.stanford.edu/tutorial/supervised/FeatureExtractionUsingConvolution