Netty工具类HashedWheelTimer源码走读(三)

接上一篇( http://my.oschina.net/haogrgr/blog/490266 )

8. Worker代码走读.

//主要负责累加tick, 执行到期任务等.
private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    private long tick;

    @Override
    public void run() {
        //初始化startTime, startTime只是一个起始时间的标记, 任务的deadline是相对这个时间点来的.
        startTime = System.nanoTime();

        //因为nanoTime返回值可能为0, 甚至负数, 所以这时赋值为1, Timer中start方法会判断该值, 直到不为0才跳出循环.
        if (startTime == 0) { 
            startTime = 1;
        }

        //唤醒阻塞在Timer.start()方法上的线程, 表示已经启动完成.
        startTimeInitialized.countDown();

        //只要还是启动状态, 就一直循环
        do {
            //waitForNextTick方法主要是计算下次tick的时间, 然后sleep到下次tick
            //返回值就是System.nanoTime() - startTime, 也就是Timer启动后到这次tick, 所过去的时间
            final long deadline = waitForNextTick();
            if (deadline > 0) {//可能溢出, 所以小于等于0不管
                
                //获取index, 原理见Timer的构造方法注释, 等价于 tick % wheel.length
                int idx = (int) (tick & mask);  
                
                //移除cancel了的task, 具体可以见HashedWheelTimeout.cancel()方法注释
                processCancelledTasks();

                //当前tick对应的wheel
                HashedWheelBucket bucket = wheel[idx];

                //因为添加任务是先加入到timeouts队列中, 而这里就是将任务从队列中取出, 放到对应的bucket中
                transferTimeoutsToBuckets();

                //见上篇HashedWheelBucket.expireTimeouts()方法的注释
                //具体是根据当前的deadline, 判断bucket中的人物是否到期, 到期的任务就执行, 没到期的, 就将人物轮数减一.
                //正常情况下, 一个bucket在一轮中, 只会执行一次expireTimeouts方法.
                bucket.expireTimeouts(deadline);

                //累加tick
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        //返回调用stop()时, 还未处理的任务.
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }

        //加上还没来得及放入bucket中的任务
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }

        //最好移除下cancel了的task
        processCancelledTasks();
    }

    //将Timer.newTimeout()调用放入到timeouts时的任务放入到对应的bucket中
    private void transferTimeoutsToBuckets() {
        //一次tick, 最多放入10w任务, 防止太多了, 造成worker线程在这里停留太久.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                //全部处理完了, 退出循环
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                //还没加入到bucket中, 就取消了, 继续.
                continue;
            }

            //calculated 表示任务要经过多少个tick
            long calculated = timeout.deadline / tickDuration;

            //设置任务要经过的轮数
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            //如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 于是方法调用完后就会执行.
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);//同样, 类似于ticks % wheel.length
            
            //这时任务所在的bucket在wheel中的位置就表示, 经过n轮后, 还需要多少次tick才执行.
            HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);//将timeout加入到链表
        }
    }

    //将cancel任务从队列中取出, 并执行cancel操作, 具体可以见HashedWheelTimeout.cancel()方法注释.
    private void processCancelledTasks() {
        for (;;) {
            Runnable task = cancelledTimeouts.poll();
            if (task == null) {
                // all processed
                break;
            }
            try {
                task.run();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);
                }
            }
        }
    }

    /**
     * calculate goal nanoTime from startTime and current tick number,
     * then wait until that goal has been reached.
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    //sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长
    private long waitForNextTick() {

        //下次tick的时间点, 用于计算需要sleep的时间
        long deadline = tickDuration * (tick + 1);

        //循环, 直到HashedWheelTimer被stop, 或者到了下个tick
        for (;;) {

            //计算需要sleep的时间, 之所以加9999999后再除10000000, 是因为保证为10毫秒的倍数.
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {//小于等于0, 表示本次tick已经到了, 返回.
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE; //不懂不懂, 我不懂...估计又是nanoTime的问题.
                } else {
                    return currentTime; //返回过去的时间.
                }
            }

            // Check if we run on windows, as if thats the case we will need
            // to round the sleepTime as workaround for a bug that only affect
            // the JVM if it runs on windows.
            //
            // See https://github.com/netty/netty/issues/356
            if (PlatformDependent.isWindows()) {//不多说, 一个字, 屌
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);//睡吧
            } catch (InterruptedException ignored) {
                //当调用Timer.stop时, 退出
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    public Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}

终于搞完了, 具体看注释吧, 很详细的注释.

9. 备注.

在看代码的时候, 大部分都好, 但是有些代码看的很困惑, 比如说

startTime = System.nanoTime();
if (startTime == 0) {
    // We use 0 as an indicator for the uninitialized value here, so make sure it‘s not 0 when initialized.
    startTime = 1;
}

//startTime 是volatile的, 然后没有其他地方修改startTime, 为什么这里还要判断下是否为0...

然后我就去群里问了问, 最后定位到是nanoTime的问题, API文档说, 它连负数都可能返回~~~~

大神(北京-菜鸟多年)贴的nanoTime实现.

jlong os::javaTimeNanos() {
  if (Linux::supports_monotonic_clock()) {
    struct timespec tp;
    int status = Linux::clock_gettime(CLOCK_MONOTONIC, &tp);
    assert(status == 0, "gettime error");
    jlong result = jlong(tp.tv_sec) * (1000 * 1000 * 1000) + jlong(tp.tv_nsec);
    return result;
  } else {
    timeval time;
    int status = gettimeofday(&time, NULL);
    assert(status != -1, "linux error");
    jlong usecs = jlong(time.tv_sec) * (1000 * 1000) + jlong(time.tv_usec);
    return 1000 * usecs;
  }
}

北京-菜鸟多年 : 看了下 代码  原来这个 clock_gettime函数 可能会发生时间回绕

北京-菜鸟多年 : 然后  获得的纳秒 就变成0了

北京-菜鸟多年 : 不过, 需要很长时间

北京-菜鸟多年 : 时间是递增的  递增到一定地步就溢出了  然后就从0开始

北京-菜鸟多年 : timespec 这里面  秒和纳秒分开存储的

北京-菜鸟多年 : 就是为了延长回绕出现的几率

北京-菜鸟多年 : else那个分支  就是  currentTimeMillis() 的实现

北京-菜鸟多年 : 不过 是  纳秒级别

北京-菜鸟多年 : 和currentTimeMillis算法一样的, 性能要慢些

10. 总结.

任务里不要有太耗时的操作, 否则会阻塞Worker线程, 导致tick不准.

Wheel Timer, 确实是很精巧的算法, Netty实现的HashedWheelTimer也是经过大神们极致的优化而来的.

时间: 2024-08-28 20:57:48

Netty工具类HashedWheelTimer源码走读(三)的相关文章

Netty工具类HashedWheelTimer源码走读

1. 简单介绍. A Timer optimized for approximated I/O timeout scheduling. 关于Timer的介绍可以看看这篇文章, 写得不错 :  http://novoland.github.io/%E5%B9%B6%E5%8F%91/2014/07/26/%E5%AE%9A%E6%97%B6%E5%99%A8%EF%BC%88Timer%EF%BC%89%E7%9A%84%E5%AE%9E%E7%8E%B0.html 可以看到, HashedWhe

Netty工具类HashedWheelTimer源码走读(二)

接上一篇( http://my.oschina.net/haogrgr/blog/489320 ) 6. HashedWheelTimeout源码走读. //任务的包装类, 链表结构, 负责保存deadline, 轮数, 等 //继承MpscLinkedQueueNode, 是因为timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理(并发优化) private static final class HashedWheelTimeout ex

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Apache Spark源码走读之14 -- Graphx实现剖析

欢迎转载,转载请注明出处,徽沪一郎. 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的

baksmali和smali源码分析(三)

baksmali 的源码分析 在baksmali进行源码分析之前,需要读者掌握一条主线,因为本身笔者只是由于项目需要用到这套源码,在工作之余的时间里面来进行学习也没有时间和精力熟读源码的每个文件每个方法,但是依据这条主线,至少能够猜出并且猜对baksmali里面的源码的文件大概的作用是什么,这样在修改问题和移植的时候才能做到游刃有余. 这条主线是,baksmali其实只是利用了dexlib2提供的接口,将dex文件读入到一块内存中,这块内存或者说数据结构开辟的大小是跟输入的dex文件相关的,而这

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

关于java中ReentrantLock类的源码分析以及总结与例子

一,官方描述 关于ReentrantLock的官方描述,英文的就不贴出来了,这里我只贴出我自己翻译的描述: reentrant是一个跟synchronized具有相同行为和语义的持有锁来访问方法和语句的互斥锁,但是reentrant还拥有被扩展的能力. ReentrantLock会被线程拥有并且持续锁定,不会解锁.线程调用lock()方法返回后,则成功持有锁,否则这个锁正在被另一个线程所持有,只能等待另一个线程释放锁,如果当前线程拥有了锁,则调用lock()方法会立即返回,这个状态可以通过isH

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

Nouveau源码分析(三):NVIDIA设备初始化之nouveau_drm_probe

Nouveau源码分析(三) 向DRM注册了Nouveau驱动之后,内核中的PCI模块就会扫描所有没有对应驱动的设备,然后和nouveau_drm_pci_table对照. 对于匹配的设备,PCI模块就调用对应的probe函数,也就是nouveau_drm_probe. // /drivers/gpu/drm/nouveau/nouveau_drm.c 281 static int nouveau_drm_probe(struct pci_dev *pdev, 282 const struct