【Java并发编程】19、DelayQueue源码分析

DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间)。DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式。在任意时间点,程序都仅有一个领导者线程,它负责监听IO事件。而其他线程都是追随者,它们休眠在线程池中等待成为新的领导者。当前的领导者如果检测到IO事件,首先要从线程池中推选出新的领导者线程,然后处理IO事件。此时,新的领导者等待新的IO事件,而原来的领导者则处理IO事件,二者实现了并发。

 简单理解,就是最多只有一个线程在处理,其他线程在睡眠。在DelayQueue的实现中,Leader/Followers模式用于等待队首的第一个元素。

类定义及参数:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    /** 重入锁,实现线程安全 */
    private final transient ReentrantLock lock = new ReentrantLock();
    /** 使用优先队列实现 */
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /** Leader/Followers模式 */
    private Thread leader = null;

    /** 条件对象,当新元素到达,或新线程可能需要成为leader时被通知 */
    private final Condition available = lock.newCondition();

  构造函数:

    /**
     * 默认构造,得到空的延迟队列
     */
    public DelayQueue() {}

    /**
     * 构造延迟队列,初始包含c中的元素
     *
     * @param c 初始包含的元素集合
     * @throws NullPointerException 当集合或集合任一元素为空时抛出空指针错误
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

  add方法:

    /**
     * 向延迟队列插入元素
     *
     * @param e 要插入的元素
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean add(E e) {
        // 直接调用offer并返回
        return offer(e);
    }

  offer方法:

    /**
     * 向延迟队列插入元素
     *
     * @param e 要插入的元素
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        try {
            // 向优先队列插入元素
            q.offer(e);
            // 若在此之前队列为空,则置空leader,并通知条件对象,需要结合take方法看
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

  put方法:

    /**
     * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞。
     *
     * @param e 要插入的元素
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public void put(E e) {
        offer(e);
    }

  带超时的offer方法:

    /**
     * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞,因此,直接调用offer方法并返回
     *
     * @param e 要插入的元素
     * @param timeout 不会阻塞,忽略
     * @param unit 不会阻塞,忽略
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        // 直接调用offer方法并返回
        return offer(e);
    }

  poll方法:

    /**
     * 获取并移除队首的元素, 或者返回null(如果队列不包含到达延迟时间的元素)
     *
     * @return 队首的元素, 或者null(如果队列不包含到达延迟时间的元素)
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        try {
            // 获取优先队列队首元素
            E first = q.peek();
            // 若优先队列队首元素为空,或者还没达到延迟时间,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            // 否则,返回并移除队首元素
            else
                return q.poll();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

  take方法(重要):

    /**
     * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素
     *
     * @return 队首元素
     * @throws InterruptedException 阻塞时被打断,抛出打断异常
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获得锁,该锁可被打断
        lock.lockInterruptibly();
        try {
            // 循环处理
            for (;;) {
                // 获取队首元素
                E first = q.peek();
                // 若元素为空,等待条件,在offer方法中会调用条件对象的通知方法
                // 并重新进入循环
                if (first == null)
                    available.await();
                // 若元素不为空
                else {
                    // 获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 若达到延迟时间,返回并移除队首元素
                    if (delay <= 0)
                        return q.poll();
                    // 否则,需要进入等待
                    first = null; // 在等待时,不持有引用
                    // 若leader不为空,等待条件
                    if (leader != null)
                        available.await();
                    // 否则,设置leader为当前线程,并超时等待延迟时间
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 通知其他线程条件得到满足
            if (leader == null && q.peek() != null)
                available.signal();
             // 释放锁
            lock.unlock();
        }
    }

  带超时的poll方法(重要):

    /**
     * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素或超时
     *
     * @return 队首元素,或者null
     * @throws InterruptedException 阻塞等待时被打断,抛出打断异常*/
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don‘t retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

  peek方法:

    /**
     * 获取但不移除队首元素,或返回null(如果队列为空)。和poll方法不同,
     * 若队列不为空,该方法换回队首元素,不论是否达到延迟时间
     *
     * @return 队首元素,或null(如果队列为空)
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

出处:

https://www.cnblogs.com/enumhack/p/7472873.html

https://www.cnblogs.com/wanly3643/p/3944661.html

jdk源码

原文地址:https://www.cnblogs.com/wangzhongqiu/p/8529001.html

时间: 2024-08-30 10:28:08

【Java并发编程】19、DelayQueue源码分析的相关文章

Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式

在上一篇<Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析>中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态.理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解.在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作.AQS为在独占模

Java并发系列[5]----ReentrantLock源码分析

在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant

Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析

学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了

Java并发编程之ReentrantLock源码分析

ReentrantLock介绍 从JDK1.5之前,我们都是使用synchronized关键字来对代码块加锁,在JDK1.5引入了ReentrantLock锁.synchronized关键字性能比ReentrantLock锁要差,而且ReentrantLock锁功能要比synchronized关键字功能强大. 特点 synchronized关键字和ReentrantLock锁都是重入锁,可重入锁是指当一个线程获取到锁后,此线程还可继续获得这把锁,在此线程释放这把锁前其他线程则不可获得这边锁.相比

关于java中ReentrantLock类的源码分析以及总结与例子

一,官方描述 关于ReentrantLock的官方描述,英文的就不贴出来了,这里我只贴出我自己翻译的描述: reentrant是一个跟synchronized具有相同行为和语义的持有锁来访问方法和语句的互斥锁,但是reentrant还拥有被扩展的能力. ReentrantLock会被线程拥有并且持续锁定,不会解锁.线程调用lock()方法返回后,则成功持有锁,否则这个锁正在被另一个线程所持有,只能等待另一个线程释放锁,如果当前线程拥有了锁,则调用lock()方法会立即返回,这个状态可以通过isH

Java中arraylist和linkedlist源码分析与性能比较

Java中arraylist和linkedlist源码分析与性能比较 1,简介 在java开发中比较常用的数据结构是arraylist和linkedlist,本文主要从源码角度分析arraylist和linkedlist的性能. 2,arraylist源码分析 Arraylist底层的数据结构是一个对象数组,有一个size的成员变量标记数组中元素的个数,如下图: * The array buffer into which the elements of the ArrayList are sto

Java并发(四):并发集合ConcurrentHashMap的源码分析

之前介绍了Java并发的基础知识和使用案例分析,接下来我们正式地进入Java并发的源码分析阶段,本文作为源码分析地开篇,源码参考JDK1.8 OverView: JDK1.8源码中的注释提到:ConcurrentHashMap是一种提供完整的并发检索和对于并发更新有高预测性的散列表,遵循了与HashMap相同的功能性规格,并包含与HashTable每个方法都对应的方法.虽然所有操作都是线程安全的,但检索操作并不牵涉任何锁,不支持任何锁住整个散列表来保护所有的访问. ConcurrentHashM

死磕 java集合之DelayQueue源码分析

问题 (1)DelayQueue是阻塞队列吗? (2)DelayQueue的实现方式? (3)DelayQueue主要用于什么场景? 简介 DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务. 继承体系 从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列. 另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口. 那么,Delayed是什么呢? public

Java并发编程之CountDownLatch源码解析

一.导语 最近在学习并发编程原理,所以准备整理一下自己学到的知识,先写一篇CountDownLatch的源码分析,之后希望可以慢慢写完整个并发编程. 二.什么是CountDownLatch CountDownLatch是java的JUC并发包里的一个工具类,可以理解为一个倒计时器,主要是用来控制多个线程之间的通信.比如有一个主线程A,它要等待其他4个子线程执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了. 三.简单使用 public static void mai

Java线程池ThreadPoolExector的源码分析

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率.压榨CPU处理能力.多条线路同时运行等都是强有力的杀手锏工具.线程是如此的重要,那么我们来思考这样一个问题.假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行