java.util.concurrent.DelayQueue 源码学习

  jdk1.8

  DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间)。DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,其内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

  Leader/Followers模式,借用其他博客的话来解释下:

打个比方:

  1. 话说一个地方有一群有组织无纪律的人从事山贼这个很有前途的职业。
  2. 一般就是有一个山贼在山路口察看,其他人在林子里面睡觉。
  3. 假如发现有落单的过往客商,望风的山贼就会弄醒一个睡觉的山贼,然后自己去打劫。
  4. 醒来的山贼接替作望风的事情。
  5. 打劫的山贼搞定以后,就会去睡觉,直到被其他望风的山贼叫醒来望风为止。
  6. 有时候过往客商太多,而山贼数量不够,有些客商就能侥幸平安通过山岭(所有山贼都去打劫其他客商了)。

下面是这个模式的计算机版本:

  1. 有若干个线程(一般组成线程池)用来处理大量的事件
  2. 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠。
  3. 假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件。
  4. 唤醒的追随者作为新的领导者等待事件的发生。
  5. 处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者。
  6. 假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。

  简单理解,就是最多只有一个线程在处理,其他线程在睡眠。在DelayQueue的实现中,Leader/Followers模式用于等待队首的第一个元素。源码注释:

Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely. The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled. So waiting threads must be prepared to acquire and lose leadership while waiting.

  类定义及参数:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    /** 重入锁,实现线程安全 */
    private final transient ReentrantLock lock = new ReentrantLock();
    /** 使用优先队列实现 */
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /** Leader/Followers模式 */
    private Thread leader = null;

    /** 条件对象,当新元素到达,或新线程可能需要成为leader时被通知 */
    private final Condition available = lock.newCondition();

  构造函数:

    /**
     * 默认构造,得到空的延迟队列
     */
    public DelayQueue() {}

    /**
     * 构造延迟队列,初始包含c中的元素
     *
     * @param c 初始包含的元素集合
     * @throws NullPointerException 当集合或集合任一元素为空时抛出空指针错误
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

  add方法:

    /**
     * 向延迟队列插入元素
     *
     * @param e 要插入的元素
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean add(E e) {
        // 直接调用offer并返回
        return offer(e);
    }

  offer方法:

    /**
     * 向延迟队列插入元素
     *
     * @param e 要插入的元素
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        try {
            // 向优先队列插入元素
            q.offer(e);
            // 若在此之前队列为空,则置空leader,并通知条件对象,需要结合take方法看
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

  put方法:

    /**
     * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞。
     *
     * @param e 要插入的元素
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public void put(E e) {
        offer(e);
    }

  带超时的offer方法:

    /**
     * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞,因此,直接调用offer方法并返回
     *
     * @param e 要插入的元素
     * @param timeout 不会阻塞,忽略
     * @param unit 不会阻塞,忽略
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        // 直接调用offer方法并返回
        return offer(e);
    }

  poll方法:

    /**
     * 获取并移除队首的元素, 或者返回null(如果队列不包含到达延迟时间的元素)
     *
     * @return 队首的元素, 或者null(如果队列不包含到达延迟时间的元素)
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        try {
            // 获取优先队列队首元素
            E first = q.peek();
            // 若优先队列队首元素为空,或者还没达到延迟时间,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            // 否则,返回并移除队首元素
            else
                return q.poll();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

  take方法(重要):

    /**
     * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素
     *
     * @return 队首元素
     * @throws InterruptedException 阻塞时被打断,抛出打断异常
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获得锁,该锁可被打断
        lock.lockInterruptibly();
        try {
            // 循环处理
            for (;;) {
                // 获取队首元素
                E first = q.peek();
                // 若元素为空,等待条件,在offer方法中会调用条件对象的通知方法
                // 并重新进入循环
                if (first == null)
                    available.await();
                // 若元素不为空
                else {
                    // 获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 若达到延迟时间,返回并移除队首元素
                    if (delay <= 0)
                        return q.poll();
                    // 否则,需要进入等待
                    first = null; // 在等待时,不持有引用
                    // 若leader不为空,等待条件
                    if (leader != null)
                        available.await();
                    // 否则,设置leader为当前线程,并超时等待延迟时间
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 通知其他线程条件得到满足
            if (leader == null && q.peek() != null)
                available.signal();
             // 释放锁
            lock.unlock();
        }
    }

  带超时的poll方法(重要):

    /**
     * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素或超时
     *
     * @return 队首元素,或者null
     * @throws InterruptedException 阻塞等待时被打断,抛出打断异常*/
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don‘t retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

  peek方法:

    /**
     * 获取但不移除队首元素,或返回null(如果队列为空)。和poll方法不同,
     * 若队列不为空,该方法换回队首元素,不论是否达到延迟时间
     *
     * @return 队首元素,或null(如果队列为空)
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

  size方法:

    /**
     * 获取队列大小(包括未达到延迟时间的元素)
     *
     * @return 队列大小
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }

  其他方法...

时间: 2024-10-20 17:57:33

java.util.concurrent.DelayQueue 源码学习的相关文章

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包

Aomic数据类型有四种类型:AomicBoolean, AomicInteger, AomicLong, 和AomicReferrence(针对Object的)以及它们的数组类型, 还有一个特殊的AomicStampedReferrence,它不是AomicReferrence的子类,而是利用AomicReferrence实现的一个储存引用和Integer组的扩展类 首先,所有原子操作都是依赖于sun.misc.Unsafe这个类,这个类底层是由C++实现的,利用指针来实现数据操作 关于CAS

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker