Netty工具类HashedWheelTimer源码走读(二)

接上一篇( http://my.oschina.net/haogrgr/blog/489320 )

6. HashedWheelTimeout源码走读.

//任务的包装类, 链表结构, 负责保存deadline, 轮数, 等
//继承MpscLinkedQueueNode, 是因为timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理(并发优化)
private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout>
        implements Timeout {

    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;

    static {
        AtomicIntegerFieldUpdater<HashedWheelTimeout> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
        }
        STATE_UPDATER = updater;
    }

    private final HashedWheelTimer timer; //timer引用
    private final TimerTask task; //要执行的任务引用
    private final long deadline; //Timer启动时间 - 任务执行时间(任务加入时间+任务延迟时间)

    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int state = ST_INIT;

    //离任务执行还要等待的轮数, 当任务加入到wheel中时计算该值, 并在Worker中, 每过一轮, 该值减一.
    long remainingRounds;

    //双链表, 因为只有Worker这一个线程访问, 所以不需要synchronization / volatile.
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    //HashedWheelTimeout 所在的 wheel
    HashedWheelBucket bucket;

    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;
    }

    @Override
    public Timer timer() {
        return timer;
    }

    @Override
    public TimerTask task() {
        return task;
    }

    @Override
    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        //这里只修改状态从ST_INIT到ST_CANCELLED
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        }

        //如果状态修改成功, 则表示第一次调用cancel方法, 将HashedWheelTimeout从bucked中移除的操作封装,
        //加入到cancelled队列, 等待下一次tick再移除, 跟踪下了源码历史发现之所以这么做, 是为了对GC友好, 以前取消任务要等到下一轮才会被处理,
        //于是, 改成将cancel的任务放在timeous队列里, 然后统一处理, timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理,
        //然而, 后面又发现有锁的问题, 因为timeous这个队列可能被多个线程操作(HashedWheelTimer.newTimeout()), 开始是加锁的, 
        //于是, 将cancel任务另外存一个队列, 这样, 就不需要使用锁了, 具体见:
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        timer.cancelledTimeouts.add(new Runnable() {
            @Override
            public void run() {
                HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
                if (bucket != null) {
                    bucket.remove(HashedWheelTimeout.this);
                }
            }
        });
        return true;
    }

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);
    }

    public int state() {
        return state;
    }

    @Override
    public boolean isCancelled() {
        return state() == ST_CANCELLED;
    }

    @Override
    public boolean isExpired() {
        return state() == ST_EXPIRED;
    }

    @Override
    public HashedWheelTimeout value() {
        return this;
    }

    //到期, 执行任务
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ‘.‘, t);
            }
        }
    }
}

果然是大牛, 各种优化, 看了下源码的提交记录, 截取几段:

1) https://github.com/netty/netty/commit/1f68479e3cd94deb3172edd3c01aa74f35032b9b   (以前wheel用的HashSet, 改成了数组)

Motivation:
At the moment there are two issues with HashedWheelTimer:
* the memory footprint of it is pretty heavy (250kb fon an empty instance)
* the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen.

Modification:
Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a MPSC queue to handover the timeouts.  So volatile writes are reduced to a minimum and also the memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance.

Result:
Lower memory-footprint and better performance

2) https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1

Motivation:
At the moment the HashedWheelTimer will only remove the cancelled Timeouts once the HashedWheelBucket is processed again. Until this the instance will not be able to be GC‘ed as there are still strong referenced to it even if the user not reference it by himself/herself. This can cause to waste a lot of memory even if the Timeout was cancelled before.

Modification:
Add a new queue which holds CancelTasks that will be processed on each tick to remove cancelled Timeouts. Because all of this is done only by the WorkerThread there is no need for synchronization and only one extra object creation is needed when cancel() is executed. For addTimeout(...) no new overhead is introduced.

Result:
Less memory usage for cancelled Timeouts.

3) https://github.com/netty/netty/commit/44ea769f537bf16b833d03db844b1f3067b3acd7

Motivation:
Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE.

Modification:
Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC‘ed.

Result:
No more NPE

回到主题, 代码并不复杂, 开始看的时候, 发现继承了MpscLinkedQueueNode, 但是又没有地方用到, 后面看了下, 发现MpscLinkedQueue对其有特殊处理.

可以看到HashedWheelTimeout就是对Timeout任务的包装, 链表结构方便加入wheel, 记录deadline, remainingRounds, state等信息,

7. HashedWheelBucket 源码走读.

//用来存放HashedWheelTimeout, 结构有点像linked-list, 方便移除操作.
private static final class HashedWheelBucket {

    //链表结构
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    //添加HashedWheelTimeout, 链表操作, 不多说~~~
    public void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    //当tick到该wheel的时候, Worker会调用这个方法, 根据deadline来判断任务是否过期(remainingRounds是否为0), 
    //任务到期就执行, 没到期, 就timeout.remainingRounds--, 因为走到这里, 表示改wheel里的任务又过了一轮了.
    public void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        //遍历链表
        while (timeout != null) {
            boolean remove = false;
            if (timeout.remainingRounds <= 0) {//任务已到执行点
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
                remove = true;
            } else if (timeout.isCancelled()) {
                remove = true;
            } else {//没到期, 剩余轮数减一
                timeout.remainingRounds --;
            }

            //先保存next, 因为移除后, 再获取timeout.next会为空.
            HashedWheelTimeout next = timeout.next;
            if (remove) {//当以到期, 或者被取消, 就将timeou从链表中移除
                remove(timeout);
            }
            timeout = next;
        }
    }

    //链表移除, 不多说
    public void remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        if (timeout == head) {
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            tail = timeout.prev;
        }
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
    }

    //Clear this bucket and return all not expired / cancelled {@link Timeout}s.
    public void clearTimeouts(Set<Timeout> set) {
        for (;;) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    //链表的poll
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head =  null;
        } else {
            this.head = next;
            next.prev = null;
        }

        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}

可以看到, 代码也不复杂, 主要是提供一个类似于LinkedList的容器, 用来存放HashedWheelTimeout, 并提供expireTimeouts(long deadline) 方法来处理该wheel中的任务. 具体处理看注释.

字数限制... 接第二篇..., 还剩最后的Worker的代码.

时间: 2024-10-11 00:22:55

Netty工具类HashedWheelTimer源码走读(二)的相关文章

Netty工具类HashedWheelTimer源码走读

1. 简单介绍. A Timer optimized for approximated I/O timeout scheduling. 关于Timer的介绍可以看看这篇文章, 写得不错 :  http://novoland.github.io/%E5%B9%B6%E5%8F%91/2014/07/26/%E5%AE%9A%E6%97%B6%E5%99%A8%EF%BC%88Timer%EF%BC%89%E7%9A%84%E5%AE%9E%E7%8E%B0.html 可以看到, HashedWhe

Netty工具类HashedWheelTimer源码走读(三)

接上一篇( http://my.oschina.net/haogrgr/blog/490266 ) 8. Worker代码走读. //主要负责累加tick, 执行到期任务等. private final class Worker implements Runnable {     private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();     private long tick;     @O

JDK中String类的源码分析(二)

1.startsWith(String prefix, int toffset)方法 包括startsWith(*),endsWith(*)方法,都是调用上述一个方法 1 public boolean startsWith(String prefix, int toffset) { 2 char ta[] = value; 3 int to = toffset; 4 char pa[] = prefix.value; 5 int po = 0; 6 int pc = prefix.value.l

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

可视化工具gephi源码探秘(二)---导入netbeans

在上篇<可视化工具gephi源码探秘(一)>中主要介绍了如何将gephi的源码导入myeclipse中遇到的一些问题,此篇接着上篇而来,主要讲解当下通过myeclipse导入gephi源码的可行性不高以及熟悉netbeans,并把原本基于netbeans平台开发的gephi源码导入进netbeans后启动正常运行的过程,其中有遇到的不少问题和相应的解决方法. 前日工作梗概(还是沿着想把源码导入myeclipse的思路): 经过从各大子模块的pom.xml中筛选出符合条件的jar包写入项目下的p

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Apache Spark源码走读之14 -- Graphx实现剖析

欢迎转载,转载请注明出处,徽沪一郎. 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

关于java中ReentrantLock类的源码分析以及总结与例子

一,官方描述 关于ReentrantLock的官方描述,英文的就不贴出来了,这里我只贴出我自己翻译的描述: reentrant是一个跟synchronized具有相同行为和语义的持有锁来访问方法和语句的互斥锁,但是reentrant还拥有被扩展的能力. ReentrantLock会被线程拥有并且持续锁定,不会解锁.线程调用lock()方法返回后,则成功持有锁,否则这个锁正在被另一个线程所持有,只能等待另一个线程释放锁,如果当前线程拥有了锁,则调用lock()方法会立即返回,这个状态可以通过isH