netty对象池使用与回收

1. Recycler对象池

  Recycler抽象类的实现非常简单,只有三个方法:

  获取对象:Recycler:get()

  回收对象:Recycler:recycle()

  创建对象:Recycler:newObject()

  newObject为抽象方法,需要由实现类自己实现此方法来创建对象。

  Recycler对象池目的是尽可能的重复利用同一线程创建的对象,同时为了避免占用过多的内存,回收对象时采用一定的比例回收对象(默认1/7规则 注释中有解释),未回收的对象由jvm垃圾回收机制来处理。

  Recycler对象池的数据存储结构如下:

  

  和创建stack相同的线程回收对象时存储在elements数组(pushNow方法回收),如果不是同一个线程则放入WeakOrderQueue队列中,等到需要get对象时且stack为空,会将WeakOrderQueue集合的所有元素根据一定的规则转移到stack的elements数组中。

2. 对象的获取

  首先判断池中是否存在对象,如果由则优先从本地线程stack中获取Object,如果stack为空时,再将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则),然后再从stack获取Object并返回.
  如果池中不存在对象,创建对象并返回。

  相关代码如下:   

    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();//首先判断池中是否存在对象,如果由则优先从本地线程stack中获取Object,如果stack为空时,再将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则),然后再从stack获取Object并返回.
        if (handle == null) {//如果池中不存在对象,创建对象并返回。
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        //        System.out.println(threadLocal.getClass() + "=" + stack + "=" + handle);
        return (T) handle.value;
    }

    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {首先判断池中是否存在对象//
            if (!scavenge()) {//如果stack为空时,再将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则)
                return null;
            }
            size = this.size;
        }
        size--;
        DefaultHandle ret = elements[size];
        elements[size] = null;
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        this.size = size;
        return ret;
    }

    boolean scavenge() {
        // continue an existing scavenge, if any
        if (scavengeSome()) {
            return true;
        }

        // reset our scavenge cursor
        prev = null;
        cursor = head;
        return false;
    }

    boolean scavengeSome() {
        WeakOrderQueue prev;
        WeakOrderQueue cursor = this.cursor;
        if (cursor == null) {
            prev = null;
            cursor = head;
            if (cursor == null) {
                return false;
            }
        } else {
            prev = this.prev;
        }

        boolean success = false;
        do {
            if (cursor.transfer(this)) {//将其他线程queue集合的所有对象根据一定的规则转移到本地线程stack中(1/7规则)
                success = true;
                break;
            }
            WeakOrderQueue next = cursor.next;
            if (cursor.owner.get() == null) {
                // If the thread associated with the queue is gone, unlink it, after
                // performing a volatile read to confirm there is no data left to collect.
                // We never unlink the first queue, as we don‘t want to synchronize on updating the head.
                if (cursor.hasFinalData()) {
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }

                if (prev != null) {
                    prev.setNext(next);
                }
            } else {
                prev = cursor;
            }

            cursor = next;

        } while (cursor != null && !success);//遍历所有的WeakOrderQueue

        this.prev = prev;
        this.cursor = cursor;
        return success;
    }

    boolean transfer(Stack<?> dst) {
        Link head = this.head.link;
        if (head == null) {
            return false;
        }

        if (head.readIndex == LINK_CAPACITY) {
            if (head.next == null) {
                return false;
            }
            this.head.link = head = head.next;
        }

        final int srcStart = head.readIndex;
        int srcEnd = head.get();
        final int srcSize = srcEnd - srcStart;
        if (srcSize == 0) {
            return false;
        }

        final int dstSize = dst.size;
        final int expectedCapacity = dstSize + srcSize;

        if (expectedCapacity > dst.elements.length) {
            final int actualCapacity = dst.increaseCapacity(expectedCapacity);
            srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
        }

        if (srcStart != srcEnd) {
            final DefaultHandle[] srcElems = head.elements;
            final DefaultHandle[] dstElems = dst.elements;
            int newDstSize = dstSize;
            for (int i = srcStart; i < srcEnd; i++) {//将queue中所有元素按照(1/7规则)保存到stack中
                DefaultHandle element = srcElems[i];
                if (element.recycleId == 0) {
                    element.recycleId = element.lastRecycledId;
                } else if (element.recycleId != element.lastRecycledId) {
                    throw new IllegalStateException("recycled already");
                }
                srcElems[i] = null;

                if (dst.dropHandle(element)) {
                    // Drop the object.
                    continue;
                }
                element.stack = dst;
                dstElems[newDstSize++] = element;
            }

            if (srcEnd == LINK_CAPACITY && head.next != null) {
                // Add capacity back as the Link is GCed.
                this.head.reclaimSpace(LINK_CAPACITY);
                this.head.link = head.next;
            }

            head.readIndex = srcEnd;
            if (dst.size == newDstSize) {
                return false;
            }
            dst.size = newDstSize;
            return true;
        } else {
            // The destination stack is full already.
            return false;
        }
    }

3.  回收对象

  如果当前回收的线程是原始线程(创建对象的线程),那么调用pushNow,如果超过Stack的容量(默认4*1024)或者1/7规则 就drop(由jvm回收释放)
  如果当前回收的线程不是原始线程,那么调用pushLater,如果当前线程是第一次回收原始线程的对象,需要由当前线程创建的WeakOrderQueue(原始线程的stack对象中可以关联其他线程的WeakOrderQueue)。如果创建队列成功,则进入该队列;失败(queue个数大于maxDelayedQueues)则放弃该对象(由jvm回收释放)

注释:1)由本地线程的WeakOrderQueue回收对象,这么做的原因时避免并发(races竞争),而且不是每个对象都有对象回收池来回收,如果超过最大容量限制会放弃,而且本地线程stack采用1/8规则措施,而且将异地线程queue的对象转移到本地线程stack时也采取1/8规则措施

   2)1/7规则是指每7个对象只留取1个回收,剩余部分放弃,如1-7个回收1个,8-15回收2个......
回收对象的相关代码如下:

     (1)DefaultHandle类
        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
            stack.push(this);
        }

        (2)Stack类
        void push(DefaultHandle<?> item) {
        Thread currentThread = Thread.currentThread();
        if (threadRef.get() == currentThread) {
            // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
            pushNow(item);
        } else {
            // The current Thread is not the one that belongs to the Stack
            // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
            // happens later.
            pushLater(item, currentThread);
        }

       (3)private void pushNow(DefaultHandle<?> item) {
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            }
            item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

            int size = this.size;
            if (size >= maxCapacity || dropHandle(item)) {//为了避免回收对象数量太多,占用太多内存,采取了1/8规则措施,参数可调
                // Hit the maximum capacity or should drop - drop the possibly youngest object.
                return;
            }
            if (size == elements.length) {
                elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
            }

            elements[size] = item;
            this.size = size + 1;
        }

        (4)private void pushLater(DefaultHandle<?> item, Thread thread) {
            // we don‘t want to have a ref to the queue as the value in our weak map
            // so we null it out; to ensure there are no races with restoring it later
            // we impose a memory ordering here (no-op on x86)
            Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
            WeakOrderQueue queue = delayedRecycled.get(this);
            if (queue == null) {
                if (delayedRecycled.size() >= maxDelayedQueues) {回收池的异地队列个数有限制,每个队列的容量也有限制
                    // Add a dummy queue so we know we should drop the object
                    delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                    return;
                }
                // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
                if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
                    // drop object
                    return;
                }
                delayedRecycled.put(this, queue);
            } else if (queue == WeakOrderQueue.DUMMY) {
                // drop object
                return;
            }

            queue.add(item);
        }

        (5)WeakOrderQueue类
        void add(DefaultHandle<?> handle) {
            handle.lastRecycledId = id;

            Link tail = this.tail;
            int writeIndex;
            if ((writeIndex = tail.get()) == LINK_CAPACITY) {
                if (!head.reserveSpace(LINK_CAPACITY)) {
                    // Drop it.
                    return;
                }
                // We allocate a Link so reserve the space
                this.tail = tail = tail.next = new Link();

                writeIndex = tail.get();
            }
            tail.elements[writeIndex] = handle;
            handle.stack = null;
            // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
            // this also means we guarantee visibility of an element in the queue if we see the index updated
            tail.lazySet(writeIndex + 1);
        }
 

原文地址:https://www.cnblogs.com/wangshifa666/p/9565237.html

时间: 2024-10-12 12:23:27

netty对象池使用与回收的相关文章

Netty轻量级对象池实现分析

什么是对象池技术?对象池应用在哪些地方? 对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象,类似线程池的概念.对象池缓存了一些已经创建好的对象,避免需要时才创建对象,同时限制了实例的个数.池化技术最终要的就是重复的使用池内已经创建的对象.从上面的内容就可以看出对象池适用于以下几个场景: 创建对象的开销大 会创建大量的实例 限制一些资源的使用 如果创建一个对象的开销特别大,那么提前创建一些可以使用的并且缓存起来(池化技术就是重复使用对象,提前创建并缓存起来重复使用就是池化)可以降低创建对

对象池实现分析

对象池实现分析 什么是对象池技术?对象池应用在哪些地方? 对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象,类似线程池的概念.对象池缓存了一些已经创建好的对象,避免需要时才创建对象,同时限制了实例的个数.池化技术最终要的就是重复的使用池内已经创建的对象.从上面的内容就可以看出对象池适用于以下几个场景: 创建对象的开销大 会创建大量的实例 限制一些资源的使用 如果创建一个对象的开销特别大,那么提前创建一些可以使用的并且缓存起来(池化技术就是重复使用对象,提前创建并缓存起来重复使用就是池化

Netty 高性能之道 - Recycler 对象池的复用

前言 我们知道,Java 创建一个实例的消耗是不小的,如果没有使用栈上分配和 TLAB,那么就需要使用 CAS 在堆中创建对象.所以现在很多框架都使用对象池.Netty 也不例外,通过重用对象,能够避免频繁创建对象和销毁对象带来的损耗. 来看看具体实现. 1. Recycler 抽象类简介 该类 doc: Light-weight object pool based on a thread-local stack. 基于线程局部堆栈的轻量级对象池. 该类是个容器,内部主要是一个 Stack 结构

Netty4底层用对象池和不用对象池实践优化

随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作.但是对于缓冲区Buffer,情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作.为了尽量重用缓冲区,Netty提供了基于内存池的缓冲区重用机制.性能测试表明,采用内存池的ByteBuf相比于朝生夕灭的ByteBuf,性能高23倍左右(性能数据与使用场景强相关). 在4.x版本中,UnpooledByteBufAllocator是默认的allocator,尽管其存在某些限制.现在PooledByte

[netty4][netty-buffer]netty之池化buffer

PooledByteBufAllocator buffer分配 buffer分配的入口: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int) netty实际应用时分配调用栈: CLASS_NAME METHOD_NAME LINE_NUM io/netty/buffer/PooledByteBufAllocator newDirectBuffer 339 io/netty/buffer/AbstractByteBufA

jvm如何判断对象是否可以被回收

内容基本来自周志明 深入理解Java虚拟机 第二版 第三章 .这本书还可以,不过好像也没什么其他中文的关于jvm比较好的书了 jvm要做垃圾回收时,首先要判断一个对象是否还有可能被使用.那么如何判断一个对象是否还有可能被用到? 如果我们的程序无法再引用到该对象,那么这个对象就肯定可以被回收,这个状态称为不可达.当对象不可达,该对象就可以作为回收对象被垃圾回收器回收. 那么这个可达还是不可达如何判断呢? 答案就是GC roots ,也就是根对象,如果从一个对象没有到达根对象的路径,或者说从根对象开

屏幕坐标和世界坐标的转换+对象池技术(3D打地鼠小游戏)

游戏中可能经常会遇到需要某个物体跟着鼠标移动,然后又需要把物体放在某个鼠标指定的位置 实现方式 Camera.main.WorldToScreenPoint Camera.main.ScreenToWorldPoint 3D打地鼠实例 我这里用到的素材都比较简陋,几乎全是用Unity做的 首先是锤子 就是两个Cylinder,在把手的位置放一个空物体用于模拟锤子的动作,命名为Hammer,把锤子作为Hammer的子物体,给Hammer添加Animation动画: 在三个关键帧位置设置Hammer

深度剖析C++对象池自动回收技术实现

http://www.tuicool.com/articles/mQBfQfN 对象池可以显著提高性能,如果一个对象的创建非常耗时或非常昂贵,频繁去创建的话会非常低效.对象池通过对象复用的方式来避免重复创建对象,它会事先创建一定数量的对象放到池中,当用户需要创建对象的时候,直接从对象池中获取即可,用完对象之后再放回到对象池中,以便复用.这种方式避免了重复创建耗时或耗资源的大对象,大幅提高了程序性能.本文将探讨对象池的技术特性以及源码实现. 对象池类图 ObjectPool:管理对象实例的pool

线程池的基本思想还是一种对象池的思想

线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理.当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源. 比如:一个应用要和网络打交道,有很多步骤需要访问网络,为了不阻塞主线程,每个步骤都创建个线程,在线程中和网络交互,用线程池就变的简单,线程池是对线程的一种封装,让线程用起来更加简便,只需要创一个线程池,把这些步骤像任务一样放进线程池,在程序销毁时只要调用线程