JDK源码那些事儿之LinkedBlockingDeque

阻塞队列中目前还剩下一个比较特殊的队列实现,相比较前面讲解过的队列,本文中要讲的LinkedBlockingDeque比较容易理解了,但是与之前讲解过的阻塞队列又有些不同,从命名上你应该能看出一些端倪,接下来就一起看看这个特殊的阻塞队列

前言

JDK版本号:1.8.0_171

LinkedBlockingDeque在结构上有别于之前讲解过的阻塞队列,它不是Queue而是Deque,中文翻译成双端队列,双端队列指可以从任意一端入队或者出队元素的队列,实现了在队列头和队列尾的高效插入和移除

LinkedBlockingDeque是链表实现的线程安全的无界的同时支持FIFO、LIFO的双端阻塞队列,可以回顾下之前的LinkedBlockingQueue阻塞队列特点,本质上是类似的,但是又有些不同:

  • 内部是通过Node节点组成的链表来实现的,当然为了支持双端操作,结点结构不同
  • LinkedBlockingQueue通过两个ReentrantLock锁保护竞争资源,实现了多线程对竞争资源的互斥访问,入队和出队互不影响,可同时操作,然而LinkedBlockingDeque只设置了一个全局ReentrantLock锁,两个条件对象实现互斥访问,性能上要比LinkedBlockingQueue差一些
  • 无界,默认链表长度为Integer.MAX_VALUE,本质上还是有界
  • 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待

Queue和Deque的关系有点类似于单链表和双向链表,LinkedBlockingQueue和LinkedBlockingDeque的内部结点实现就是单链表和双向链表的区别,具体可参考源码

在第二点中可能有些人有些疑问,两个互斥锁和一个互斥锁的区别在哪里?我们可以考虑以下场景:

A线程先进行入队操作,B线程随后进行出队操作,如果是LinkedBlockingQueue,A线程入队过程还未结束(已获得锁还未释放),B线程出队操作不会被阻塞等待(锁不同),如果是LinkedBlockingDeque则B线程会被阻塞等待(同一把锁)A线程完成操作才继续执行

LinkedBlockingQueue一般的操作是获取一把锁就可以,但有些操作例如remove操作,则需要同时获取两把锁,之前的LinkedBlockingQueue讲解曾经说明过,这里就不详细讲解了

类定义

实现BlockingDeque接口,其中定义了双端队列应该实现的方法,具体方法不说明了,主要是每个方法都分为了First和Last两种方式,从头部或者尾部进行队列操作

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable

常量/变量

    /**
     * 头结点
     */
    transient Node<E> first;

    /**
     * 尾结点
     */
    transient Node<E> last;

    /** 双端队列实际结点个数 */
    private transient int count;

    /** 双端队列容量 */
    private final int capacity;

    /** 互斥重入锁 */
    final ReentrantLock lock = new ReentrantLock();

    /** 非空条件对象 */
    private final Condition notEmpty = lock.newCondition();

    /** 非满条件对象 */
    private final Condition notFull = lock.newCondition();

内部类

为了实现双端队列,内部使用了双向链表,不像LinkedBlockingQueue使用的是单链表,前驱和后继指针的特殊情况需要注意

    /** 双向链表Node */
    static final class Node<E> {
        /**
         * 节点数据值,移除则为null
         */
        E item;

        /**
         * 下列情况之一:
         * - 前驱节点
         * - 前驱是尾节点,则prev指向当前节点
         * - 无前驱节点则为null
         */
        Node<E> prev;

        /**
         * 下列情况之一:
         * - 后继节点
         * - 后继是头节点,则next指向当前节点
         * - 无后继节点则为null
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

构造方法

构造方法比较简单,可设置容量上限,不设置默认Integer.MAX_VALUE,类似之前的LinkedBlockingQueue

    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        // 确保可见性
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }

重要方法

offerFirst/offerLast

在入队操作中,将每种方法都分成了队首入队和队尾入队两种,在获取锁之后最终调用方法是linkFirst/linkLast。在putFirst中如果队列已满则通过notFull.await()阻塞操作,比较容易理解

入队方法如下:

类别 失败返回特殊值 失败抛异常 阻塞等待 超时阻塞等待
队首 offerFirst push/addFirst putFirst offerFirst(E e, long timeout, TimeUnit unit)
队尾 offer/offerLast add/addLast put/putLast offer/offerLast(E e, long timeout, TimeUnit unit)
    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkLast(node);
        } finally {
            lock.unlock();
        }
    }

linkFirst/linkLast

将结点添加到头部或者尾部,如果队列已满则返回false,在调用方法之前已经获取互斥锁才进行操作

    /**
     * Links node as first element, or returns false if full.
     */
    private boolean linkFirst(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        // 队列已满
        if (count >= capacity)
            return false;
        // 更新first
        Node<E> f = first;
        node.next = f;
        first = node;
        // last为空则表示原队列为空,当前队列仅有node则last更新指向node即可
        if (last == null)
            last = node;
        else
            // 原队列非空,更新原first的前驱指针
            f.prev = node;
        ++count;
        // 通过条件对象唤醒出队操作阻塞线程
        notEmpty.signal();
        return true;
    }

    /**
     * Links node as last element, or returns false if full.
     */
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        // 更新last
        Node<E> l = last;
        node.prev = l;
        last = node;
        // first为空则表示原队列为空,当前队列仅有node则first更新指向node即可
        if (first == null)
            first = node;
        else
            // 原队列非空,更新原last的后继指针
            l.next = node;
        // 实际节点数量 + 1
        ++count;
        // 通过条件对象唤醒出队操作阻塞线程
        notEmpty.signal();
        return true;
    }

pollFirst/pollLast

在出队操作中,同样将每种方法都分成了队首出队和队尾出队两种,在获取锁之后最终调用方法是unlinkFirst/unlinkLast。在takeFirst中如果队列为空则通过notEmpty.await()阻塞操作,比较容易理解

出队方法如下:

类别 失败返回特殊值 失败抛异常 阻塞等待 超时阻塞等待
队首 poll/pollFirst pop/remove/removeFirst take/takeFirst poll/pollFirst(long timeout, TimeUnit unit)
队尾 pollLast removeLast takeLast pollLast(long timeout, TimeUnit unit)
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }

    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }

unlinkFirst/unlinkLast

移除返回队头元素或队尾元素,假如队列为空则返回null

    /**
     * Removes and returns first element, or null if empty.
     */
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        // 判空
        if (f == null)
            return null;
        // 更新first
        Node<E> n = f.next;
        E item = f.item;
        // item置空,next指向自己,方便gc回收
        f.item = null;
        f.next = f; // help GC
        first = n;
        // 当前队列为空,last指向置空
        if (n == null)
            last = null;
        else
            // 当前队列非空,新的头结点前驱置空
            n.prev = null;
        // 实际结点数量 - 1
        --count;
        // 通过条件对象唤醒入队操作阻塞线程
        notFull.signal();
        return item;
    }

    /**
     * Removes and returns last element, or null if empty.
     */
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last;
        if (l == null)
            return null;
        // 更新last
        Node<E> p = l.prev;
        // item置空,prev指向自己,方便gc回收
        E item = l.item;
        l.item = null;
        l.prev = l; // help GC
        last = p;
        // 当前队列为空,last指向置空
        if (p == null)
            first = null;
        else
            // 当前队列非空,新的尾结点后继置空
            p.next = null;
        --count;
        // 通过条件对象唤醒入队操作阻塞线程
        notFull.signal();
        return item;
    }

unlink

将队列中匹配的结点删除,链表中进行解除前后关联即可,注意下如果x处于队列中间(非头和尾结点),则x本身的前驱和后继指针不会被更新修改,为的是防止迭代器循环到x找不到前后结点,避免迭代器异常

    /**
     * Unlinks x.
     */
    void unlink(Node<E> x) {
        // assert lock.isHeldByCurrentThread();
        // 前驱结点和后继结点
        Node<E> p = x.prev;
        Node<E> n = x.next;
        // 前驱为空,相当于删除头结点
        if (p == null) {
            unlinkFirst();
        } else if (n == null) {
            // 后继为空,相当于删除尾结点
            unlinkLast();
        } else {
            // 前驱后继都不为空,解除删除结点与前后结点的关系,item置空
            // 注意,这里x本身的前驱和后继没有被更新修改,为的是防止迭代器循环到x找不到前后结点,避免迭代器异常
            p.next = n;
            n.prev = p;
            x.item = null;
            --count;
            // 通过条件对象唤醒入队操作阻塞线程
            notFull.signal();
        }
    }

peekFirst/peekLast

peek操作直接通过首尾节点指向获得对应的item即可,不会删除节点

    public E peekFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }

    public E peekLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (last == null) ? null : last.item;
        } finally {
            lock.unlock();
        }
    }

removeFirstOccurrence/removeLastOccurrence

删除第一个/最后一个满足条件的队列结点,removeFirstOccurrence从前向后进行匹配,removeLastOccurrence从后向前进行匹配,找到第一个满足条件的结点进行删除操作

    public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 从前向后循环判断相等则通过unlink移除
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    public boolean removeLastOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 从后向前循环判断相等则通过unlink移除
            for (Node<E> p = last; p != null; p = p.prev) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

drainTo

转移队列操作

    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(maxElements, count);
            for (int i = 0; i < n; i++) {
                // 先添加防止未添加到新队列中时原队列结点出队
                c.add(first.item);   // In this order, in case add() throws.
                // 解除关联
                unlinkFirst();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

clear

清空队列操作,比较简单,很好理解

    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Node<E> f = first; f != null; ) {
                f.item = null;
                Node<E> n = f.next;
                f.prev = null;
                f.next = null;
                f = n;
            }
            first = last = null;
            count = 0;
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }

迭代器

由于其双向链表的实现,迭代器可分为升序迭代器(Itr)和倒序迭代器(DescendingItr),通过AbstractItr封装公共操作方法,Itr和DescendingItr分别实现对应不同的方法,模板方法设计模式写法可借鉴,一个从头节点开始向后进行遍历,一个从尾节点向后进行遍历

    private abstract class AbstractItr implements Iterator<E> {
        /**
         * next方法返回的node
         */
        Node<E> next;

        /**
         * 保存next的item,防止hasNext为true后节点被删除再调用next获取不到值的情况
         */
        E nextItem;

        /**
         * 最近一次调用next返回的节点,如果通过调用remove删除了此元素,则重置为null
         */
        private Node<E> lastRet;

        // 两个不同迭代器实现不同
        abstract Node<E> firstNode();
        abstract Node<E> nextNode(Node<E> n);

        // 构造方法初始化,设置next和nextItem
        AbstractItr() {
            // set to initial position
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                next = firstNode();
                nextItem = (next == null) ? null : next.item;
            } finally {
                lock.unlock();
            }
        }

        /**
         * 返回后继节点
         */
        private Node<E> succ(Node<E> n) {
            for (;;) {
                Node<E> s = nextNode(n);
                if (s == null)
                    return null;
                else if (s.item != null)
                    return s;
                else if (s == n)
                    return firstNode();
                else
                    n = s;
            }
        }

        /**
         * 设置下一次执行next应该返回的值
         */
        void advance() {
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                // assert next != null;
                next = succ(next);
                nextItem = (next == null) ? null : next.item;
            } finally {
                lock.unlock();
            }
        }

        public boolean hasNext() {
            return next != null;
        }

        public E next() {
            if (next == null)
                throw new NoSuchElementException();
            lastRet = next;
            E x = nextItem;
            advance();
            return x;
        }

        // 移除操作,注意,这里直接在原队列中移除了lastRet对应的节点
        public void remove() {
            Node<E> n = lastRet;
            if (n == null)
                throw new IllegalStateException();
            lastRet = null;
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                if (n.item != null)
                    unlink(n);
            } finally {
                lock.unlock();
            }
        }
    }

    /** Forward iterator */
    // 从头节点向后遍历迭代器
    private class Itr extends AbstractItr {
        Node<E> firstNode() { return first; }
        Node<E> nextNode(Node<E> n) { return n.next; }
    }

    /** Descending iterator */
    // 从尾节点向后遍历迭代器
    private class DescendingItr extends AbstractItr {
        Node<E> firstNode() { return last; }
        Node<E> nextNode(Node<E> n) { return n.prev; }
    }

总结

源码分析完毕,整理LinkedBlockingDeque有如下特点:

  • 链表实现的线程安全的无界的同时支持FIFO、LIFO的双端阻塞队列
  • 通过结点同时持有前驱结点和后继结点的引用支持队列的头和尾的双端操作
  • 迭代器支持正向和反向两种迭代方式
  • 大部分方法都争抢唯一一把可重入互斥锁,在竞争激烈的多线程并发环境下吞吐量比较低
  • 删除中间结点(非头尾)时,保留结点前后前驱指针,防止迭代器异常

以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢

原文地址:https://www.cnblogs.com/freeorange/p/11780959.html

时间: 2024-10-03 17:38:38

JDK源码那些事儿之LinkedBlockingDeque的相关文章

JDK源码那些事儿之并发ConcurrentHashMap上篇

前面前已经说明了HashMap以及红黑树的一些基本知识,对JDK8的HashMap也有了一定的了解,本篇就开始看看并发包下的ConcurrentHashMap,说实话,还是比较复杂的,笔者在这里也不会过多深入,源码层次上了解一些主要流程即可,清楚多线程环境下整个Map的运作过程就算是很大进步了,更细的底层部分需要时间和精力来研究,暂不深入 前言 jdk版本:1.8 JDK7中,ConcurrentHashMap把内部细分成了若干个小的HashMap,称之为段(Segment),默认被分为16个段

JDK源码那些事儿之ConcurrentLinkedDeque

非阻塞队列ConcurrentLinkedQueue我们已经了解过了,既然是Queue,那么是否有其双端队列实现呢?答案是肯定的,今天就继续说一说非阻塞双端队列实现ConcurrentLinkedDeque 前言 JDK版本号:1.8.0_171 ConcurrentLinkedDeque是一个基于链表实现的无界的线程安全的同时支持FIFO.LIFO非阻塞双端队列.操作上可类比ConcurrentLinkedQueue,利用CAS进行无锁操作,同时通过松弛度阈值设置来减少CAS操作,在理解这个类

JDK源码那些事儿之HashMap.TreeNode

前面几篇文章已经讲解过HashMap内部实现以及红黑树的基础知识,今天这篇文章就讲解之前HashMap中未讲解的红黑树操作部分,如果没了解红黑树,请去阅读前面的两篇文章,能更好的理解本章所讲解的红黑树源码操作,全文默认读者已经了解红黑树的相关知识,接下来,就以HashMap.TreeNode来说明红黑树的源码操作. 前言 jdk版本:1.8 以HashMap.TreeNode为例是因为之前HashMap的红黑树操作在文章省略了,这里进行一个解释,其实源码里并不是只有这个地方用红黑树结构,但是总体

JDK源码那些事儿之常用的ArrayList

前面已经讲解集合中的HashMap并且也对其中使用的红黑树结构做了对应的说明,这次就来看下简单一些的另一个集合类,也是日常经常使用到的ArrayList,整体来说,算是比较好理解的集合了,一起来看下 前言 jdk版本:1.8 类定义 public class ArrayList<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable 继承了A

JDK源码那些事儿之LinkedTransferQueue

在JDK8的阻塞队列实现中还有两个未进行说明,今天继续对其中的一个阻塞队列LinkedTransferQueue进行源码分析,如果之前的队列分析已经让你对阻塞队列有了一定的了解,相信本文要讲解的LinkedTransferQueue的源码也能很快被理解,接下来一起学习吧 前言 JDK版本号:1.8.0_171 LinkedTransferQueue是基于链表的FIFO无界阻塞队列,在源码分析前,需要提前对源码实现整体有个印象,便于细节的理解.注释部分对于这个类进行了一些说明和介绍,如果有能力的话

JDK源码那些事儿之ConcurrentLinkedQueue

阻塞队列的实现前面已经讲解完毕,今天我们继续了解源码中非阻塞队列的实现,接下来就看一看ConcurrentLinkedQueue非阻塞队列是怎么完成操作的 前言 JDK版本号:1.8.0_171 ConcurrentLinkedQueue是一个基于链表实现的无界的线程安全的FIFO非阻塞队列.最大的不同之处在于非阻塞特性,不会进行阻塞等待直接返回操作结果.其中head和tail的更新类似之前在LinkedTransferQueue中讲解的slack(松弛度)机制,只有在slack阈值大于等于2时

JDK源码那些事儿之红黑树基础上篇

说到HashMap,就一定要说到红黑树,红黑树作为一种自平衡二叉查找树,是一种用途较广的数据结构,在jdk1.8中使用红黑树提升HashMap的性能,今天就来说一说红黑树. 前言 限于篇幅,本文只对红黑树的基础进行说明,暂不涉及源码部分,大部分摘抄自维基百科,这里也贴出对应链接: 维基百科(中文):https://zh.wikipedia.org/wiki/%E7%BA%A2%E9%BB%91%E6%A0%91 维基百科:https://en.wikipedia.org/wiki/Red%E2%

设置Eclipse可以Debug模式调试JDK源码,并显示局部变量的1

最近突然萌发了研究JDK源码的想法,所以就想到了在自己常用的Eclipse上可以调试JDK源码. 整个设置过程也很简单: 首先你要安装好JDK(我的JDK安装路径根目录是D:\Java\jdk-8u92-windows-x64),JDK安装路径里有个"src.zip"就是JDK的源码文件压缩包: 设置好环境变量的JAVA_HOME变量和PATH变量(JAVA_HOME变量值也是D:\Java\jdk-8u92-windows-x64). 然后打开Eclipse设置可以Debug模式调试

使用Eclipse跟踪JDK源码

首先我们要学会的是将JDK源码加载Eclipse中. 1.点"窗口"-->"首选项",选择左边的"Java"-->"已安装的JRE",然后选择我们安装的JRE,并单击它,然后选择右边的"编辑". 点"编辑"将出现如下的界面: 2.跟踪阅读源码 如上图,在我自己写的代码中包含了StringTokenizer类,我们要看它的具体定义,就只要按住"Ctrl"键,