JUC阻塞队列之DelayQueue源码分析

DelayQueue是一个支持延时获取元素的***阻塞队列。并且队列中的元素必须实现Delayed接口。在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中获取到元素。DelayQueue的应用范围非常广阔,如可以用它来保存缓存中元素的有效期,也可用它来实现定时任务。

Delayed接口

在分析DelayQueue源码之前,我们先来看看Delayd接口,其源码定义如下:

public interface Delayed extends Comparable < Delayed > {

    /**
     * 指定返回对象的延时时间
     * @param  unit [时间单位]
     * @return      [延时的剩余,0或者-1表示延时已经过期]
     */
    long getDelay(TimeUnit unit);
}

我们看到,Delayed接口继承了Comparable接口,即实现Delayed接口的对象必须实现getDelay(TimeUnit unit)方法和compareTo(T o)方法。这里compareTo(T o)方法可以用来实现元素的排序,可以将延时时间长的放到队列的末尾。

DelayQueue构造函数

上面分析了Delayed接口,接下来我们分析DelayQueue的构造函数。DelayQueue提供了2种构造函数,一个是无参构造函数,一个是给定集合为参数的构造函数。其源码如下:

/**
 * 构建一个空的DelayQueue
 */
public DelayQueue() {}

/**
 * 给定集合c为参数的构造函数
 * 将集合c中的元素全部放入到DelayQueue中
 */
public DelayQueue(Collection < ? extends E > c) {
    this.addAll(c);
}

addAll方法是AbstractQueue抽象类中的方法,其源码如下:

public boolean addAll(Collection < ? extends E > c) {
    // 参数检测
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    //遍历集合c中的元素
    for (E e: c)
        // 调用DelayQueue中的add方法
        if (add(e))
            modified = true;
    return modified;
}

从上面的源码中,我们可以看到,AbstractQueue抽象类中addAll方法实际是调用DelayQueue类中的add方法来实现的。

DelayQueue 入列操作

DelayQueue提供了4中入列操作,分别是:

  • add(E e):阻塞的将制定元素添加到延时队列中去,因为队列是***的因此此方法永不阻塞。
  • offer(E e):阻塞的将制定元素添加到延时队列中去,因为队列是***的因此此方法永不阻塞。
  • put(E e):阻塞的将制定元素添加到延时队列中去,因为队列是***的因此此方法永不阻塞。
  • offer(E e, long timeout, TimeUnit unit):阻塞的将制定元素添加到延时队列中去,因为队列是***的因此此方法永不阻塞。

这里大家可能会奇怪,为什么这些入列方法的解释都是一样的?这个问题先等下回答,我们先来看看这几个入列方法的源码定义:

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    //获取可重入锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //调用PriorityQueue中的offer方法
        q.offer(e);
        //调用PriorityQueue中的peek方法
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //释放锁
        lock.unlock();
    }
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

这里我们从源码中可以看到,add(E e)方法、put(E e)方法和offer(E e,long timeout,TimeUnit unit)方法都是调用offer(E e)方法来实现的,这也是为什么这几个方法的解释都是一样的原因。其中offer(E e)方法的核心又是调用了PriorityQueue中的offer(E e)方法,PriorityQueue和PriorityBlockingQueue都是以二叉堆的***队列,只不过PriorityQueue不是阻塞的而PriorityBlockingQueue是阻塞的。

DelayQueue出列操作

DelayQueue提供了3中出列操作方法,它们分别是:

  • poll():检索并删除此队列的开头,如果此队列没有延迟延迟的元素,则返回null
  • take():检索并除去此队列的头,如有必要,请等待直到该队列上具有过期延迟的元素可用。
  • poll(long timeout, TimeUnit unit):检索并删除此队列的头,如有必要,请等待直到该队列上具有过期延迟的元素可用,或者或指定的等待时间到期。

下面我们来一个一个分析出列操作的原来。

poll():

poll操作的源码定义如下:

public E poll() {
   //获取可重入锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
       //获取队列中的第一个元素
        E first = q.peek();
        //若果元素为null,或者头元素还未过期,则返回false
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
           //调用PriorityQueue中的出列方法
            return q.poll();
    } finally {
        lock.unlock();
    }
}

该方法与PriorityQueue的poll方法唯一的区别就是多了if (first == null || first.getDelay(NANOSECONDS) > 0)这个条件判断,该条件是表示如果队列中没有元素或者队列中的元素未过期,则返回null。

take

take操作源码定义如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    try {
        //西循环
        for (;;) {
            //查看队列头元素
            E first = q.peek();
            //如果队列头元素为null,则表示队列中没有数据,线程进入等待队列
            if (first == null)
                available.await();
            else {
                // 获取first元素剩余的延时时间
                long delay = first.getDelay(NANOSECONDS);
                //若果剩余延时时间<=0 表示元素已经过期,可以从队列中获取元素
                if (delay <= 0)
                    //直接返回头部元素
                    return q.poll();
                //若果剩余延时时间>0,表示元素还未过期,则将first置为null,防止内存溢出
                first = null; // don‘t retain ref while waiting
                //如果leader不为null,则直接进入等待队列中等待
                if (leader != null)
                    available.await();
                else {
                    //若果leader为null,则把当前线程赋值给leader,并超时等待delay纳秒
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            //唤醒线程
            available.signal();
        lock.unlock();
    }
}

take操作比poll操作稍微要复杂些,但是逻辑还是相对比较简单。只是在获取元素的时候先检查元素的剩余延时时间,如果剩余延时时间<=0,则直接返回队列头元素。如果剩余延时时间>0,则判断leader是否为null,若果leader不为null,则表示已经有线程在等待获取队列的头部元素,因此直接进入等待队列中等待。若果leader为null,则表示这是第一个获取头部元素的线程,把当前线程赋值给leader,然后超时等待剩余延时时间。在take操作中需要注意的一点是fist=null,因为如果first不置为null的话会引起内存溢出的异常,这是因为在并发的时候,每个线程都会持有一份first,因此first不会被释放,若果线程数过多,就会导致内存溢出的异常。

poll(long timeout, TimeUnit unit)

超时等待获取队列元素的源码如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don‘t retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

这个出列操作的逻辑和take出列操作的逻辑几乎一样,唯一不同的在于take是无时间限制等待,而改操作是超时等待。

总结

DelayQueue的入列和出列操作逻辑相对比较简单,就是在获取元素的时候,判断元素是否已经过期,若果过期就可以直接获取,没有过期的话poll操作是直接返回null,take操作是进入等待队列中等待。

原文地址:https://blog.51cto.com/14570694/2443986

时间: 2024-10-08 01:51:16

JUC阻塞队列之DelayQueue源码分析的相关文章

死磕 java集合之DelayQueue源码分析

问题 (1)DelayQueue是阻塞队列吗? (2)DelayQueue的实现方式? (3)DelayQueue主要用于什么场景? 简介 DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务. 继承体系 从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列. 另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口. 那么,Delayed是什么呢? public

【Java并发编程】19、DelayQueue源码分析

DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间).DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间.DelayQueue不允许包含null元素. 领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听.

可阻塞队列-原理及源码解析

阻塞原理:比如,一个队列中有8个格子,代表可放入8条数据,当一条信息到来就放入一个格子中,然后就进行处理.但是这个时候一次性来了8条数据,格子满了,数据还没有处理完,就来个一条数据.这个时候就把这条数据进行阻塞. 示例:假定有一个绑定的缓冲区,它支持 put 和 take 方法.如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞:如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞.我们喜欢在单独的等待 set 中保存 put 线程和

ucos队列的实现--源码分析

之前说到事件,讲了事件,信号量和互斥信号量,还有一个队列没说,今天说说队列. 队列是用在任务之间传送多个消息的时候,a任务发送消息,b任务发送消息,然后c任务可以依次去提取出b和a传递的消息,不会造成系统的阻塞,他的实现结构如下 在队列的实现中,也是使用事件ecb,OSEventType为OS_EVENT_TYPE_Q类型,而其OSEventPtr指向一个QS_Q结构的指针,该结构的定义如下 typedef struct os_q {                   /* QUEUE CON

DelayQueue源码分析

DelayQueue<E>继承于AbstractQueue<E>实现BlockingQueue<E> 内部变量包括ReentrantLock 类型的lock以及条件Condition类型的available 同时内部维护一个优先级队列q. 内部的方法offer(E e): public boolean offer(E e) {     final ReentrantLock lock = this.lock;     lock.lock();     try {    

jQuery.queue源码分析

作者:禅楼望月(http://www.cnblogs.com/yaoyinglong ) 队列是一种特殊的线性表,它的特殊之处在于他只允许在头部进行删除,在尾部进行插入.常用来表示先进先出的操作(FIFO)--先进队列的元素先出队.搜索整个jQuery库会发现,queue在jQuery内部仅供给animate动画来使用.它提供了对外的接口,因此程序员也可以使用队列来完成一些特殊需求. queue模块对外开放的API:工具方法:queue,dequeue,_queueHooks(仅内部使用)实例方

[转]jQuery源码分析系列

文章转自:jQuery源码分析系列-Aaron 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://github.com/JsAaron/jQuery 正在编写的书 - jQuery架构设计与实现 本人在慕课网的教程(完结) jQuery源码解析(架构与依赖模块) 64课时 jQuery源码解析(DOM与核心模块)64课时 jQuery源码分析目录(完结) jQuery源码分析系列(01) : 整体架构 jQuery源码分析系列(

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理 以 LinkedBlockingQueue 分析 BlockingQueue 阻塞式队列的实现原理. 1. 数据结构 LinkedBlockingQueue 和 ConcurrentLinkedQueue 一样都是由 head 节点和 last 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列.默认情况下

Java多线程 -- JUC包源码分析11 -- ThreadPoolExecutor源码分析

在JUC包中,线程池部分本身有很多组件,可以说是前面所分析的各种技术的一个综合应用.从本文开始,将综合前面的知识,逐个分析线程池的各个组件. -Executor/Executors -ThreadPoolExecutor使用介绍 -ThreadPoolExecutor实现原理 –ThreadPoolExecutor的中断与优雅关闭 shutdown + awaitTermination –shutdown的一个误区 Executor/Executors Executor是线程池框架最基本的几个接