pool(三)——Timer

1.关于Timer的三个维度

首先是 {@link java.util.Timer},这个是最外层的类,其中包含了{@link java.util.TaskQueue},这个是存放{@link java.util.TimerTask}的队列——a priority queue of TimerTasks。

第二层是 {@link java.util.TimerThread},这个是{@link java.util.Timer}在初始化的时候创建并启动的一个线程,这个线程取任务并且执行。

 /**
     * Creates a new timer whose associated thread has the specified name.
     * The associated thread does <i>not</i>
     * {@linkplain Thread#setDaemon run as a daemon}.
     *
     * @param name the name of the associated thread
     * @throws NullPointerException if {@code name} is null
     * @since 1.5
     */
    public Timer(String name) {
        thread.setName(name);
        thread.start();
    }

2.TimerThread

Thread的子类,在run方法中循环取任务。

public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }
/**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }

3.task的创建——schedule方法

public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, -period);
    }
/**
     * Schedule the specified timer task for execution at the specified
     * time with the specified period, in milliseconds.  If period is
     * positive, the task is scheduled for repeated execution; if period is
     * zero, the task is scheduled for one-time execution. Time is specified
     * in Date.getTime() format.  This method checks timer state, task state,
     * and initial execution time, but not period.
     *
     * @throws IllegalArgumentException if <tt>time</tt> is negative.
     * @throws IllegalStateException if task was already scheduled or
     *         cancelled, timer was cancelled, or timer thread terminated.
     * @throws NullPointerException if {@code task} is null
     */
    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        // Constrain value of period sufficiently to prevent numeric
        // overflow while still being effectively infinitely large.
        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;

        synchronized(queue) {
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");

            synchronized(task.lock) {
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }

            queue.add(task);
            if (queue.getMin() == task)
                queue.notify();
        }
    }

1.两重锁,先锁队列queue,再锁task,task中有一个Object对象作为锁

2.设置TimerTask的下次执行时间

{@link java.util.TimerTask#nextExecutionTime} = System.currentTimeMillis()+delay

3.将任务添加到队列中

{@link java.util.Timer#queue}

4.当前任务如果是队列的一个任务,就执行

task == {@link java.util.TaskQueue#getMin},调用queue的notify方法。如果不是,说明前面还有等待执行的task,只入队列,不用调用notify方法。

5.{@link java.util.TimerThread#mainLoop}

{@link java.util.TimerThread}会在{@link java.util.Timer}

构造的时候启动,进而调用mainLoop方法。

最开始queue是空的,所以queue.await(),当前线程挂起,当Timer中添加TimerTask任务时,

就会调用queue.notify()方法唤醒mainLoop线程。

4.根据优先级对堆进行重排序

1.TimerTask的入堆(queue)操作

delay时间设置的很长,就是为了让任务不执行,看看入队列的比较操作,period这里完全用作标志位,在debug的时候作为标记区分不同的Task,看看排序状况。

这里以3个为例,第一个入队列,index是1,第二个入队列,index是2,2>>1=1,然后拿queue[2]和queue[1]比较下次执行时间,queue[2]比queue[1]早,所以交换顺序。

第三个入队列,index是3,3>>1=1,和第一个比,queue[3]比queue[1]要早,所以交换顺序,所以现在queue[1]最早执行,queue[2]和queue[3]的顺序没有考虑。每次入队列的重排序操作在 {@link java.util.TaskQueue#fixUp} 方法中进行

/**
     * Establishes the heap invariant (described above) assuming the heap
     * satisfies the invariant except possibly for the leaf-node indexed by k
     * (which may have a nextExecutionTime less than its parent's).
     *
     * This method functions by "promoting" queue[k] up the hierarchy
     * (by swapping it with its parent) repeatedly until queue[k]'s
     * nextExecutionTime is greater than or equal to that of its parent.
     */
    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }
2.mainLoop执行操作:

每次取queue的第一个task,如果该task还没到执行时间,就等待对应的时间queue.wait(executionTime - currentTime)。

if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);

如果这期间又来了一个优先级更高(执行顺序更靠前)的task,入队列时调用fixUp把当前task排到队列头(优先级更高),然后notify这个queue打断这个wait,重新去取优先级更高的task。

/**
     * Adds a new task to the priority queue.
     */
    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        fixUp(size);
    }

如果到了执行时间(wait结束),在下次循环的时候,就执行该task。

如果是非重复任务,调用removeMin移除当前任务,在removeMin中fixDown,进行堆重排序。

如果是重复任务的话,还要调用rescheduleMin设置下次执行的时间,在rescheduleMin中调用fixDown,进行堆重排序。

3.{@link java.util.TaskQueue#fixDown}
/**
     * Establishes the heap invariant (described above) in the subtree
     * rooted at k, which is assumed to satisfy the heap invariant except
     * possibly for node k itself (which may have a nextExecutionTime greater
     * than its children's).
     *
     * This method functions by "demoting" queue[k] down the hierarchy
     * (by swapping it with its smaller child) repeatedly until queue[k]'s
     * nextExecutionTime is less than or equal to those of its children.
     */
    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

对剩下的Task进行重新排序,把下次执行时间最小的转移到第一个任务的位置。

测试用例:
public static void test1() {
        timer.schedule(new TimerTask() {
            @Override public void run() {
                System.out.println("Time's up 1!---" + new Date().toString());
                //                    SleepUtil.sleep(30000);
            }
        }, 2000 * 1000, 11 * 1000);
        timer.schedule(new TimerTask() {
            @Override public void run() {
                System.out.println("Time's up 2!---" + new Date().toString());
                //                    SleepUtil.sleep(30000);
            }
        }, 1500 * 1000, 22 * 1000);
        timer.schedule(new TimerTask() {
            @Override public void run() {
                System.out.println("Time's up 3!---" + new Date().toString());
                //                    SleepUtil.sleep(30000);
            }
        }, 5 * 1000, 333333 * 1000);
    }

5.TimerTask的执行顺序

可以得知Timer内部是单线程执行task的,一个timer对象只会启用一个TimerThread的。

当一个timer执行多个任务时,如果一个任务执行的时间过长,后面任务执行的时间可能就不是你预期执行的时间了,因为一个任务执行完了才会执行下个任务。

测试用例:

public static void test3() {
        timer.schedule(new TimerTask() {
            @Override public void run() {
                System.out.println("Time's up 1!---" + new Date().toString());
                SleepUtil.sleep(0);
            }
        }, 0, 2 * 1000);
        timer.schedule(new TimerTask() {
            @Override public void run() {
                System.out.println("Time's up 2!---" + new Date().toString());
                SleepUtil.sleep(5000);
            }
        }, 0, 2 * 1000);
        // EvictionTimer.schedule(evictor, delay, delay);
    }

TimerTask执行时间过长,超过了period,执行5s,period是2s,这样period相当于失效了。因为下次执行的时间是这样计算的,

{@link java.util.TimerTask#nextExecutionTime} = System.currentTimeMillis()+delay

所以,当本次任务执行结束(过了超过5s),到下次任务取出来判断的执行时间的时候,肯定已经超过了原本应该执行的时间。

根据mainLoop中的逻辑,当判断执行时间比当前时间要早的话,直接执行本次任务。

时间: 2024-11-09 02:33:20

pool(三)——Timer的相关文章

任务调度(三)——Timer的替代品ScheduledExecutorService简介

先前的两篇博文<任务调度(一)--jdk自带的Timer>和<任务调度(二)--jdk自带的Timer 动态修改任务执行计划>中,简单介绍了一下Timer,可以实现几本的功能,但是在多线程方面却略显不足. 根据Timer源码,可以看到Timer是单线程的.所以task都是串行执行.假如其中一个task执行需要很长的时间,那其他的task只能干巴巴的等着.怎么办! 现在就遇到了这样的问题.总不能因为这个小小的问题就去换别的任务调度框架吧,还是想用最简单的方案去解决一下.所以Sched

任务调度(三)——Timer的替代品ScheduledExecutorService简单介绍

先前的两篇博文<任务调度(一)--jdk自带的Timer>和<任务调度(二)--jdk自带的Timer 动态改动任务运行计划>中,简介了一下Timer,能够实现几本的功能.可是在多线程方面却略显不足. 依据Timer源代码,能够看到Timer是单线程的. 所以task都是串行运行. 假如当中一个task运行须要非常长的时间,那其它的task仅仅能干巴巴的等着.怎么办! 如今就遇到了这种问题. 总不能由于这个小小的问题就去换别的任务调度框架吧,还是想用最简单的方案去解决一下.所以Sc

Swoole 实例三(Timer定时器)

server.php <?php  /*  Swoole已经内置了心跳检测功能,能自动close掉长时间没有数据来往的连接. 而开启心跳检测功能,只需要设置heartbeat_check_interval和heartbeat_idle_time即可.如下: $this->serv->set(     array(         'heartbeat_check_interval' => 60,         'heartbeat_idle_time' => 600,   

任务调度(四)——ScheduledExecutorService替代Timer,实现多线程任务调度

上篇博文<任务调度(三)--Timer的替代品ScheduledExecutorService简介>已经对ScheduledExecutorService做了简单介绍,其实使用ScheduledExecutorService来替代Timer也是迫不得已的事情.主要原因如下: Timer不支持多线程,所有挂在Timer下的任务都是单线程的,任务只能串行执行,如果其中一个任务执行时间过长,会影响到其他任务的执行,然后就可能会有各种接踵而来的问题. Timer的线程不捕获异常,TimerTask如果

Samsung_tiny4412(笔记)--&gt;jiffies,timer,kthread,workqueue,tasklet

/*********************************************************************************** * * Samsung_tiny4412(笔记)-->jiffies,timer,kthread,workqueue,tasklet * * 声明: * 1. 本文中有些源代码没有全部帖出来,主要是因为篇幅太大的原因; * 2. 基于1中的原因,本文借鉴了python中的缩进代码风格进行代码的体现: * 1. 有些代码中的"

host字段变复杂了

讨论的是openstack中卷的host属性. 印象中,社区H版本对于volume的host值表示的就是对应cinder-volume服务的host配置项,默认为GuestOS的hostname.比如单板A上的cinder-volume创建了卷V,那么V的host就是A.同时,把该host值作为rpc转发的topic,即cinder-scheduler组件已不同的host为单位进行区分,调度确定到具体哪个host后,再下发消息.这种方式跟nova模块很类似,nova-scheduler也是以底下

推荐一个很好用的脚本session snapper

源网址http://tech.e2sn.com/oracle-scripts-and-tools/session-snapper 内容: If you want to just download Snapper, you can get it from here: http://blog.tanelpoder.com/files/scripts/snapper.sql(please right click on the file and use Save as... instead of cop

兄弟连区块链教程btcpool矿池源码分析核心机制总结及优化思考

btcpool矿池-核心机制总结及优化思考 核心机制总结 ①gbtmaker 监听Bitcoind ZMQ中BITCOIND_ZMQ_HASHBLOCK消息,一有新块产生,将立即向kafka发送新Gbt 另默认每5秒间隔(可从配置文件中指定)主动RPC请求Bitcoind,获取Gbt发送给kafka Gbt消息大小约2M,含交易列表 ②jobmaker 同时监听kafka KAFKA_TOPIC_RAWGBT和KAFKA_TOPIC_NMC_AUXBLOCK,以支持混合挖矿 接收的Gbt消息,如

Java描述设计模式(18):享元模式

本文源码:GitHub·点这里 || GitEE·点这里 一.使用场景 应用代码 public class C01_InScene { public static void main(String[] args) { String c0 = "cicada" ; String c1 = "cicada" ; System.out.println(c0 == c1); // true } } String类型就是使用享元模式.String对象是final类型,对象一旦