ConcurrentLinkedQueue原码解析

描述

ConcurrentLinkedQueue是一个基于单链表的无界线程安全队列,该队列是FIFO的。ConcurrentLinkedQueue/ConcurrentLinkedDeue和LinkedBlockingQueue/LinkedBlockingDeue

相比,不同点在于它们不提供阻塞功能,并且是Lock-Free的,而后者则是利用ReentrantLock实现的,所以他们具有更高的吞吐量。

源码解析(基于jdk1.7.0_76)

数据结构

类图:

head,tail结点定义和描述:

    /**
     * A node from which the first live (non-deleted) node (if any)
     * can be reached in O(1) time.
     * Invariants:
     * - all live nodes are reachable from head via succ()
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * Non-invariants:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * Invariants:
     * - the last node is always reachable from tail via succ()
     * - tail != null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-pointing to tail.
     */
    private transient volatile Node<E> tail;

链表结构

构造函数,初始化时让head和tail同时指向一个dummy结点(否则入队时需要同时修改head,tail 2个指针,需要加锁。而有了dummy结点,入队/出队只需各自竞争一个资源即可--入队CAS竞争修改头结点item属性:p.casItem(item, null),出队CAS竞争修改尾结点next指针:p.casNext(null, newNode)):

    /**
     * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
     */
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

ConcurrentLinkedQueue中单链表结构示意图:

     *
     *         head                                   tail
     *           |                                     |
     *           v                                     v
     *
     *        +------+       +------+               +------+
     *        |dummy |------>|   a  |   ...   ----->|   n  |----->null
     *        +------+       +------+               +------+
     *

注意head和tail指针不是一定要每次更新的,事实上它们的更新是迟钝的,有滞后的(这样有助于减少CAS操作,jdk7中,head/tail指针在滞后实际的头/尾结点2步以上时才会更新,甚至会发生tail指针滞后于head指针)。真正的尾结点可以通过tail指针向后找,直到node.next==null。同样真正的队列头结点可以通过head指针向后找,直到node.item!=null。下面的描述来自ConcurrentLinkedQueue Node类的doc文档:

     * Both head and tail are permitted to lag.  In fact, failing to
     * update them every time one could is a significant optimization
     * (fewer CASes). As with LinkedTransferQueue (see the internal
     * documentation for that class), we use a slack threshold of two;
     * that is, we update head/tail when the current pointer appears
     * to be two or more steps away from the first/last node.
     *
     * Since head and tail are updated concurrently and independently,
     * it is possible for tail to lag behind head (why not)?

入队offer方法

从队尾入队,找到队尾结点p后,cas竞争更新p.next指针,选择性的更新tail指针(前面提到的滞后更新)。

    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

出队poll方法

从队列头部出队,找到头部结点p后,cas竞争把p.item值置null,选择性的更新head指针(前面提到的滞后更新)。注意head,tail指针始终不会为null,即使出队后队列空了,head和tail也会指向dummy结点。

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

updateHead方法,cas更新head指针,并通过h.lazySetNext(h),把出队的结点h的next指针指向h自己,标示出已经出队(虽然已经通过设置node.item为null标示,但这里修改next指针是为了帮助GC):

    /**
     * Try to CAS head to p. If successful, repoint old head to itself
     * as sentinel for succ(), below.
     */
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

size方法

size(), contains(), toArray()方法都需要遍历整个链表。

    /**
     * Returns the number of elements in this queue.  If this queue
     * contains more than {@code Integer.MAX_VALUE} elements, returns
     * {@code Integer.MAX_VALUE}.
     *
     * <p>Beware that, unlike in most collections, this method is
     * <em>NOT</em> a constant-time operation. Because of the
     * asynchronous nature of these queues, determining the current
     * number of elements requires an O(n) traversal.
     * Additionally, if elements are added or removed during execution
     * of this method, the returned result may be inaccurate.  Thus,
     * this method is typically not very useful in concurrent
     * applications.
     *
     * @return the number of elements in this queue
     */
    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

succ方法,寻找后继结点(通过判断p==p.next来跳过已经出队的结点):

    /**
     * Returns the successor of p, or the head node if p.next has been
     * linked to self, which will only be true if traversing with a
     * stale pointer that is now off the list.
     */
    final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

相关内容链接

聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析 (基于jdk1.6的源码解析)

时间: 2024-10-19 03:29:25

ConcurrentLinkedQueue原码解析的相关文章

ConcurrentSkipListMap原码解析

SkipList介绍 1. SkipList(跳表),在理论上能够在O(log(n))时间内完成查找.插入.删除操作.SkipList是一种红黑树的替代方案,由于SkipList与红黑树相比无论从理论和实现都简单许多,所以得到了很好的推广.SkipList是基于一种统计学原理实现的,有可能出现最坏情况,即查找和更新操作都是O(n)时间复杂度,但从统计学角度分析这种概率极小.使用SkipList类型的数据结构更容易控制多线程对集合访问的处理,因为链表的局部处理性比较好,当多个线程对SkipList

Java之&amp;0xff用法解析以及原码、反码、补码相关知识

char强转至int为什么使用0xff? 备注:在Java中采用补码形式表示二进制 如果不希望进行符号扩展,可以采用与操作.例如char c:int i = c & 0xffff:其中,char有8位,int类型有32位,采用32/8=4个f(即0xffff)做与操作,即可屏蔽符号扩展. //负整数时,前面输入了多余的 FF ,没有去掉前面多余的 FF,按并双字节形式输出System.out.println(Integer.toHexString(-2).toUpperCase());//FFF

【原】Android热更新开源项目Tinker源码解析系列之三:so热更新

本系列将从以下三个方面对Tinker进行源码解析: Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Android热更新开源项目Tinker源码解析系列之二:资源文件热更新 Android热更新开源项目Tinker源码解析系类之三:so文件热更新 转载请标明本文来源:http://www.cnblogs.com/yyangblog/p/6252855.html更多内容欢迎star作者的github:https://github.com/LaurenceYang/artic

【原】Android热更新开源项目Tinker源码解析系列之二:资源文件热更新

上一篇文章介绍了Dex文件的热更新流程,本文将会分析Tinker中对资源文件的热更新流程. 同Dex,资源文件的热更新同样包括三个部分:资源补丁生成,资源补丁合成及资源补丁加载. 本系列将从以下三个方面对Tinker进行源码解析: Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Android热更新开源项目Tinker源码解析系列之二:资源热更新 Android热更新开源项目Tinker源码解析系类之三:so热更新 转载请标明本文来源:http://www.cnblogs

【转】Java 集合系列12之 TreeMap详细介绍(源码解析)和使用示例

概要 这一章,我们对TreeMap进行学习.我们先对TreeMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用TreeMap.内容包括:第1部分 TreeMap介绍第2部分 TreeMap数据结构第3部分 TreeMap源码解析(基于JDK1.6.0_45)第4部分 TreeMap遍历方式第5部分 TreeMap示例 转载请注明出处:http://www.cnblogs.com/skywang12345/admin/EditPosts.aspx?postid=3310928 第1部

给jdk写注释系列之jdk1.6容器(2)-LinkedList源码解析

LinkedList是基于链表结构的一种List,在分析LinkedList源码前有必要对链表结构进行说明. 1.链表的概念 链表是由一系列非连续的节点组成的存储结构,简单分下类的话,链表又分为单向链表和双向链表,而单向/双向链表又可以分为循环链表和非循环链表,下面简单就这四种链表进行图解说明.           1.1.单向链表 单向链表就是通过每个结点的指针指向下一个结点从而链接起来的结构,最后一个节点的next指向null.      1. 2.单向循环链表           单向循环

【C/C++学院】0725-内存补码分析/补码原码实战/打印整数二进制数据/静态库说明

[送给在路上的程序员] 对于一个开发者而言,能够胜任系统中任意一个模块的开发是其核心价值的体现. 对于一个架构师而言,掌握各种语言的优势并可以运用到系统中,由此简化系统的开发,是其架构生涯的第一步. 对于一个开发团队而言,能在短期内开发出用户满意的软件系统是起核心竞争力的体现. 每一个程序员都不能固步自封,要多接触新的行业,新的技术领域,突破自我. 内存补码分析 #include<stdio.h> #include<stdlib.h> void main3() { //printf

计算机中的原码,反码,补码与移码

在计算机内,定点数有3种表示法:原码.反码和补码. 原码:就是二进制定点表示法,即最高位为符号位,0表示正,1表示负,其余位表示数值的大小 反码:正数的反码与其原码相同:负数的反码是对其原码逐位取反,但符号位除外.       原码10010=反码11101(10010,1为符号位,故为负) 补码:正数的补码与原码相同,负数的补码是对其原码逐位取反,但符号位除外,然后整个数加1 如果补码的符号位为0,则表示一个正数,其原码就是补码如果补码的符号位为1,则表示一个负数 移码:移码与补码的关系: [

给jdk写注释系列之jdk1.6容器(12)-PriorityQueue源码解析

PriorityQueue是一种什么样的容器呢?看过前面的几个jdk容器分析的话,看到Queue这个单词你一定会,哦~这是一种队列.是的,PriorityQueue是一种队列,但是它又是一种什么样的队列呢?它具有着什么样的特点呢?它的底层实现方式又是怎么样的呢?我们一起来看一下. PriorityQueue其实是一个优先队列,什么是优先队列呢?这和我们前面讲的先进先出(First In First Out )的队列的区别在于,优先队列每次出队的元素都是优先级最高的元素.那么怎么确定哪一个元素的优