PooledByteBufAllocator buffer分配
buffer分配的入口:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int)
netty实际应用时分配调用栈:
CLASS_NAME | METHOD_NAME | LINE_NUM |
io/netty/buffer/PooledByteBufAllocator | newDirectBuffer | 339 |
io/netty/buffer/AbstractByteBufAllocator | directBuffer | 185 |
io/netty/buffer/AbstractByteBufAllocator | directBuffer | 176 |
io/netty/buffer/AbstractByteBufAllocator | ioBuffer | 139 |
io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle | allocate | 114 |
io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe | read | 186 |
io/netty/channel/nio/NioEventLoop | processSelectedKey | 682 |
io/netty/channel/nio/NioEventLoop | processSelectedKeysOptimized | 628 |
io/netty/channel/nio/NioEventLoop | processSelectedKeys | 533 |
io/netty/channel/nio/NioEventLoop | run | 511 |
io/netty/util/concurrent/SingleThreadEventExecutor$5 | run | 956 |
测试case代码
package io.netty.buffer;
import org.junit.Assert;
public class PooledByteBufTest {
public static void main(String[] args) {
final PooledByteBufAllocator allocator = new PooledByteBufAllocator(
false, // preferDirect
0, // nHeapArena
1, // nDirectArena
8192, // pageSize
11, // maxOrder
3, // tinyCacheSize
3, // smallCacheSize
3, // normalCacheSize
true // useCacheForAllThreads
);
// create tiny buffer
final ByteBuf b1 = allocator.directBuffer(24);
// create small buffer
final ByteBuf b2 = allocator.directBuffer(800);
// create normal buffer
final ByteBuf b3 = allocator.directBuffer(8192 * 2);
Assert.assertNotNull(b1);
Assert.assertNotNull(b2);
Assert.assertNotNull(b3);
// then release buffer to deallocated memory while threadlocal cache has been disabled
// allocations counter value must equals deallocations counter value
Assert.assertTrue(b1.release());
Assert.assertTrue(b2.release());
Assert.assertTrue(b3.release());
}
}
PoolChunk
PoolChunk本身数据结构与设计思路参见PoolChunk注释:
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* page是chunk中能分配的最小单元
* > chunk - a chunk is a collection of pages
* 一个chunk中有一组page 1对多
* > in this code chunkSize = 2^{maxOrder} * pageSize
* 代码中 chunksize大小计算如上 maxOrder 是啥?
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
* in the byte array that has enough empty space to accommodate the requested size and
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved so it is always used by exactly one ByteBuf and no more)
* 首先,当需要创建给定大小的ByteBuf时,我们分配一个size=chunkSize的字节数组,
* 在字节数组中搜索第一个有足够的空空间来容纳请求的大小的位置,
* 并返回一个(长)句柄来编码该偏移量信息(然后将该内存段标记为保留,因此它总是仅由一个ByteBuf使用,不再使用)
*
* For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
* This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
* equals the next nearest power of 2
* 为了简单起见,所有大小都按照PoolArena#normalizeCapacity方法进行规范化
* 这确保当我们请求大小大于等于pageSize的内存段时,normalized容量等于下一个最接近的2的幂
*
* To search for the first offset in chunk that has at least requested size available we construct a
* complete balanced binary tree and store it in an array (just like heaps) - memoryMap
* 为了搜索块中至少有请求大小可用的第一个偏移量,我们构造了一个完整的平衡二叉树,并将其存储在一个数组(就像堆一样)-内存映射中
*
* The tree looks like this (the size of each node being mentioned in the parenthesis)
* 树看起来是这样的(括号中提到的每个节点的大小)
*
* depth=0 1 node (chunkSize)
* depth=1 2 nodes (chunkSize/2)
* ..
* ..
* depth=d 2^d nodes (chunkSize/2^d)
* ..
* depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize) pageSize 在最下一层 最顶层是chunksize 从上往下走,每过一层除以2
*
* depth=maxOrder is the last level and the leafs consist of pages
*
* With this tree available searching in chunkArray translates like this:
* To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
* which is unused 要分配大小为chunkSize/2^k的内存段,我们在高度k处搜索第一个未使用的节点(从左开始)。 嗯嗯
*
* Algorithm:
* ----------
* Encode the tree in memoryMap with the notation 用符号将树编码在内存中
* memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
* is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
* 在以id为根的子树中,可自由分配的第一个节点在深度x(从深度=0开始计算),即在深度[深度id,x的深度]处,没有可自由分配的节点
*
* As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
* 当我们分配空闲节点时,我们更新存储在memoryMap中的值,以便维护属性
*
* Initialization -
* In the beginning we construct the memoryMap array by storing the depth of a node at each node
* 首先,我们通过在每个节点上存储一个节点的深度来构造memoryMap数组
* i.e., memoryMap[id] = depth_of_id
*
* Observations:
* -------------
* 1) memoryMap[id] = depth_of_id => it is free / unallocated
* 2) memoryMap[id] > depth_of_id => at least one of its child nodes is allocated, so we cannot allocate it, but
* some of its children can still be allocated based on their availability
* 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
* is thus marked as unusable
*
* Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
* ----------
* 1) start at root (i.e., depth = 0 or id = 1)
* 2) if memoryMap[1] > d => cannot be allocated from this chunk
* 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
* 4) else try in right subtree
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) Compute d = log_2(chunkSize/size)
* 2) Return allocateNode(d)
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
* 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
* note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
*
* Note:
* -----
* In the implementation for improving cache coherence,
* we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
*
* memoryMap[id]= depth_of_id is defined above
* depthMap[id]= x indicates that the first node which is free to be allocated is at depth x (from root)
*/
final class PoolChunk<T> implements PoolChunkMetric {
io.netty.buffer.PoolArena.findSubpagePoolHead(int) 算出page header在page table中的index,小的page在前面
// trace 库地址 jdbc:h2:/Users/simon/twice-cooked-pork/trace-data/基于netty4做的resetserver的一次http请求trace/tracer.data.h2db
PoolChunk要解决的问题有:
- 快速查找未分配的地方并分配
- 尽量不要有碎片,可以理解成尽量挨着紧凑的分配
整个chunk的结构如下:
+------+ chunksize 当L=11时,是16M
L=0 | 0 |
+----------------+------+------------------+
| |
| |
| |
+---v--+ +---v--+
L=1 | 1 | | 2 |
+------+------+------+ +------+------+-------+
| | | |
| | | |
| | | |
+---v--+ +---v--+ +---v--+ +---v--+
L=2 | 3 | | 4 | | 5 | | 6 |
+--+------+-+ +-+------+--+ +--+------+--+ +-+------+--+
| | | | | | | |
| | | | | | | |
| | | | | | | |
+--v---+ +---v--+ +--v---+ +---v--+ +-v----+ +---v--+ +--v---+ +---v--+
L=3 | 7 | | 8 | | 9 | | 10 | | 11 | | 12 | | 13 | | 14 |
+------+ +------+ +------+ +------+ +------+ +------+ +------+ +------+
8K大小即page size
是一个完全二叉树,树的层高可以自定义,目前限制在30层内,默认是11层。
最底层是真正的chunk描述,最底层每个叶子是一个paage,大小为8K。那么当层数是11层时,chunk的size是16M。因为11层的话,最下面一层叶子是2的11次方,再乘以8K正好是16MB。
这棵树中每个节点还对对应其相应的大小是否被分配。什么叫其相应的大小?是这样的,每一层代表需要分配的大小的档次。暂且用档次这个词吧。最上面是16MB档次,最下面是8K档次,从最上面开始往下走一层,档次就除以2。
每次申请内存时,netty会先对其做规格化,所谓规格化就是最接近申请内存值的2de整数次幂。比如我申请900byte,那么规格化后就是1K。在规格化后,netty会在树上标志 0 1 3 7被使用了。下次要再申请8K内存时就要避开这个路径了,只能是 0 1 3 8 了,因为7那边已经不够了。其他大小同理。所以树上的节点是为了标志是否被使用过,以使得内存碎片减少尽量靠左紧凑分配。 对于单page内的内存使用浪费问题,netty又做了一层位图结构使其得以利用。对于chunk对象的查找,netty还做了缓存机制,下面有讲。
作业
仔细调试 1K 2k 3K 8K 11K 内存的多次分配与回收。
PoolArena
PoolArena
这一层负责创建与维护PoolChunk,维护的方式是将用到的正在分配中的PoolChunk放到PoolChunkList这个列表中。
PoolChunkList是一个链是结构。
而且,PoolArena还按PoolChunk的使用量来分别维护到相对应的PoolChunkList中。
// abstract class PoolArena<T> implements PoolArenaMetric {
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
这些PoolChunkList也是按使用量大小有序的链式的串在一起(参见PoolArena构造方法中初始化这些list字段的代码),当使用量达到本级别时,会加入到下一级别的list中,比如达到25%了,那么就会加到50%列表中了。(参见io.netty.buffer.PoolChunkList.add(PoolChunk))
void add(PoolChunk<T> chunk) {
if (chunk.usage() >= maxUsage) {
nextList.add(chunk);
return;
}
add0(chunk);
}
PoolArenad的cache与Recycler对象池
PooledByteBuf依赖PoolThreadCache做了一层对PoolChunk的缓存,PoolThreadCache靠MemoryRegionCache实现缓存。MemoryRegionCache靠队列来实现对PoolChunk的缓存(参见下面代码1),MemoryRegionCache在buf释放时会调用其add接口将释放的PoolChunk对象通过io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry对象包装后加入(offer)到队列(参见下面堆栈1)。在io.netty.buffer.PoolThreadCache.MemoryRegionCache.allocate(PooledByteBuf, int)时再从队列中直接poll出来,达成cache的目的。优化还没有结束,包装PoolChunk用的Entry对象是通过Recycler
对象池完成分配(获取)已释放的。对象是本质上一个通过FastThreadLocal的Stack的数据结构,分配对应出栈,释放对象入栈。具体参见下面代码2。
Recycler
是一个基于ThreadLocal结合stack玩起来的一个对象池数据结构,像上述这种就是PooledUnsafeDirectByteBuf的对象pool。回收的时候压栈,要用的时候出栈。
获取对象 io.netty.util.Recycler.get()
回收对象 io.netty.util.Recycler.DefaultHandle.recycle(Object)
代码1: 队列初始化
Queue<Entry<T>> queue = PlatformDependent.newFixedMpscQueue(this.size);
堆栈1:buf释放时会调用MemoryRegionCache add接口将释放的PoolChunk对象包装后入队:
Thread [main] (Suspended (breakpoint at line 393 in PoolThreadCache$MemoryRegionCache))
PoolThreadCache$SubPageMemoryRegionCache<T>(PoolThreadCache$MemoryRegionCache<T>).add(PoolChunk<T>, ByteBuffer, long) line: 393
PoolThreadCache.add(PoolArena<?>, PoolChunk, ByteBuffer, long, int, SizeClass) line: 209
PoolArena$DirectArena(PoolArena<T>).free(PoolChunk<T>, ByteBuffer, long, int, PoolThreadCache) line: 273
PooledUnsafeDirectByteBuf(PooledByteBuf<T>).deallocate() line: 171
PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release0(int) line: 136
PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release() line: 124
PooledByteBufTest.main(String[]) line: 43
代码2:Entry对象使用对象池
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@SuppressWarnings("unchecked")
@Override
protected Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
};
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
return entry;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
原文地址:https://www.cnblogs.com/simoncook/p/11967186.html