JUC源码分析-集合篇(三)ConcurrentLinkedQueue

JUC源码分析-集合篇(三)ConcurrentLinkedQueue

在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方
式则可以使用循环 CAS 的方式来实现。本节让我们一起来研究一下 Doug Lea 是如何使用非阻塞的方式来实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师身上我们能学到不少并发编程的技巧。

  • ConcurrentLinkedQueue 先进先出(FIFO)单向队列
  • ConcurrentLinkedDeque 双向队列

下面以 ConcurrentLinkedQueue 为例看使用非阻塞算法(CAS) 保证线程安全。

1. 数据结构

ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点(Node)由节点元素(item)和
指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一
张链表结构的队列。默认情况下 head 节点存储的元素为空,tail 节点等于 head 节点。head、tail 以及 Node.item、Node.next 都是 volatile 修辞。

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
}

默认情况下 head、tail 都是空节点。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

获取一个节点的后继节点

// 遇到哨兵节点,从 head 开始遍历
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

2. 入队 offer

入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及 head 节点和 tail 节点的变化,每添加一个节点我就做了一个队列的快照图(注意这是单线程入队情况)。

第一步添加元素 e1。队列更新 head 节点的 next 节点为元素 e1 节点。又因为 tail 节点默认情况下等于 head 节点,所以它们的 next 节点都指向元素 e1 节点。
第二步添加元素 e2。队列首先设置元素 e1 节点的 next 节点为元素 e2 节点,然后更新 tail 节点指向元素 e2 节点。
第三步添加元素 e3,设置 tail 节点的next节点为元素 e3 节点。
第四步添加元素 e4,设置元素 e3 的 next 节点为元素 e4 节点,然后将 tail 节点指向元素 e4 节点。

通过 debug 入队过程并观察 head 节点和 tail 节点的变化,发现入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。

上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS算法来入队的。

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 1. p is last node
        if (q == null) {
            // 1.1 通过自旋保证节点一定添加到数据链中
            if (p.casNext(null, newNode)) {
                // 1.2 p代表当前结点,当前节点不是尾节点时更新
                //     也就是说tail不一定是尾节点,尾节点为tail或tail.next
                //     更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 2. 遇到哨兵节点,从 head 开始遍历
        //    但是如果 tail 被修改,则使用 tail(因为可能被修改正确了)
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 3. 尾节点只可能是tail或tail.next。如果tail发生变化则直接从tail开始遍历
        else
            // Check for tail updates after two hops.
            // 其实我认为这里一直取p.next节点遍历最终可以遍历到尾节点,可以不必取重新tail
            // 可能重新取tail会遍历更快
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

3. hops 设计意图

上面分析我们知道真正的尾节点可能 tail 或 tail.next,doug lea 写的代码和逻辑还是稍微有点复杂。那么可不可以让 tail 永远指向尾节点呢?代码如下

public boolean offer(E e) {
  Node n = new Node(e);
  for (;;) {
     Node</e><e> t = tail;
     if (t.casNext(null, n) && casTail(t, n)) {
        return true;
     }
  }
}

让 tail 节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。但是这么做有个缺点就是每次都需要使用循环 CAS 更新 tail 节点,如果能减少 CAS 更新 tail 节点的次数,就能提高入队的效率。

所以 doug lea 使用 hops 变量(JDK1.8没有直接使用hops,但逻辑没有改变)来控制并减少 tail 节点的更新频率,并不是每次节点入队后都将 tail 节点更新成尾节点,而是当 tail 节点和尾节点的距离大于等于常量 HOPS 的值(默认等于1)时才更新 tail 节点,tail 和尾节点的距离越长使用 CAS 更新 tail 节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对 volatile 变量的读操作来减少了对 volatile 变量的写操作,而对 volatile 变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

// JDK1.7 代码直接使用 hops 来控制
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> n = new Node<E>(e);
    retry:
    for (;;) {
        Node<E> t = tail;
        Node<E> p = t;
        for (int hops = 0; ; hops++) {
            // 获得p节点的下一个节点。
            Node<E> next = succ(p);
            // next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
            if (next != null) {
                if (hops > HOPS && t != tail)
                    continue retry;
                p = next;
            } else if (p.casNext(null, n)) {
                if (hops >= HOPS)
                    casTail(t, n); // 更新tail节点,允许失败
                return true;
            } else {
                p = succ(p);
            }
        }
    }
}

4. 出队列 poll

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。

出队的代码和入队差不多,也有 hop 的概念。出队了完成了两件事:一是将节点的 item 设置为 null;二是更新头节点并将头节点的 next 指向自己,也就是哨兵节点。

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 1. 出队后 p.item 一定为 null
            if (item != null && p.casItem(item, null)) {
                if (p != h) // hop two nodes at a time
                    // 更新头节点并将头节点的 next 指向自己。成为哨兵节点,等 GC 回收
                    // 同样允许失败,说明其它的线程更新了头节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            // 2. 遍历到尾节点了,没有元素了
            } else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            // 3. 出现哨兵节点,说明有其它线程poll后更新了head,需要重新从head开始遍历
            } else if (p == q)
                continue restartFromHead;
            // 4. 继续遍历
            else
                p = q;
        }
    }
}

5. 其它常用方法

5.1 元素个数 size

// 可以看到 size 是一个很耗时的方法
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

5.1 元素是否为空 isEmpty

public boolean isEmpty() {
    return first() == null;
}

如果只判断集合中是否存在元素请使用 isEmpty

5.3 查找第一个有效元素 first

// 从 head 开始遍历找到第一个 item!=null 的元素
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            // 要么找到了 item!=null 的元素,要么遍历完整个链表
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            } else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

5.4 查找后继节点 succ

// 遇到哨兵节点,从 head 开始遍历
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

参考:

  1. 《Java并发编程的艺术》第六章
  2. 并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究(有节点变化图)
  3. 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析


每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/10923374.html

时间: 2024-10-20 03:10:14

JUC源码分析-集合篇(三)ConcurrentLinkedQueue的相关文章

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理 以 LinkedBlockingQueue 分析 BlockingQueue 阻塞式队列的实现原理. 1. 数据结构 LinkedBlockingQueue 和 ConcurrentLinkedQueue 一样都是由 head 节点和 last 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列.默认情况下

JUC源码分析-集合篇(七)PriorityBlockingQueue

JUC源码分析-集合篇(七)PriorityBlockingQueue PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现. PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock. 1. 基本属性 // 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是因为有的 VM 实现在数组头有些内容 private stati

Android事件传递机制详解及最新源码分析——ViewGroup篇

在上一篇<Android事件传递机制详解及最新源码分析--View篇>中,详细讲解了View事件的传递机制,没掌握或者掌握不扎实的小伙伴,强烈建议先阅读上一篇. 好了,废话还是少说,直奔主题,开始本篇的ViewGroup事件传递机制探索之旅. 依然从简单的Demo例子现象开始分析 新建安卓工程,首先自定义一个Button以及一个RelativeLayout,很简单,只是重写了主要与事件传递机制相关的方法,代码如下: 自定义WLButton类: 1 public class WLButton e

手机自动化测试:appium源码分析之bootstrap三

手机自动化测试:appium源码分析之bootstrap三 研究bootstrap源码,我们可以通过代码的结构,可以看出来appium的扩展思路和实现方式,从中可以添加我们自己要的功能,针对appium进行定制,poptest在2015年10月24日开设appium的课程,课程采用真实的商业项目进行培训,用现在互联网金融的业务.bootstrap代码中io.appium.android.bootstrap.handler包中的类都是对应的指令类,核心都是execute方法. 下面我们看下Hash

Zepto源码分析之二~三个API

由于时间关系:本次只对这三个API($.camelCase.$.contains.$.each)方法进行分析 第一个方法变量转驼峰:$.camelCase('hello-world-welcome'); 源码: var camelize; /** * 字符串替换 * 使用replace第二个参数带回调 */ camelize = function(str) { return str.replace(/-+(.)?/g, function(match, chr) { return chr ? ch

linux中断源码分析 - 中断发生(三)

本文为原创,转载请注明:http://www.cnblogs.com/tolimit/ 回顾 上篇文章linux中断源码分析 - 初始化(二)已经描述了中断描述符表和中断描述符数组的初始化,由于在初始化期间系统关闭了中断(通过设置CPU的EFLAGS寄存器的IF标志位为0),当整个中断和异常的初始化完成后,系统会开启中断(设置CPU的EFLAGS寄存器的IF标志位为1),此时整个系统的中断已经开始可以使用了.本篇文章我们具体研究一次典型中断发生时的运行流程. 中断产生 我们需要先明确一下,中断控

jQuery-1.9.1源码分析系列(三) Sizzle选择器引擎——词法解析

jQuery源码9600多行,而Sizzle引擎就独占近2000行,占了1/5.Sizzle引擎.jQuery事件机制.ajax是整个jQuery的核心,也是jQuery技术精华的体现.里面的有些策略确实很值得学习,先膜拜之,然后细细学习. 在学习Sizzle引擎之前我们先准备一点知识,和先了解Sizzle引擎的一点工作原理. <div id="chua"> <a> <span>chua的测试用例</span> </a> &l

Spring 事务源码分析——Hibernate篇

在Spring与Hibernate整合的时候,可以利用Spring的事务管理机制,为我们管理事务的开启.提交.回滚等操作.这样的方式极大的减少了我们的代码量,让我们只专注于业务代码的编写.在使用Hibernate的时候,每一个操作都要经历事务开启与提交这样的操作,他们在业务代码的周围,这样来看是不是就想到了AOP编程,把这部分代码抽取出来.没错,Spring正是这样做的,Spring的事务管理就是基于AOP的. 1 Spring的事务隔离与传播 Srping的事务定义了五个隔离等级(isolat

JUC源码分析16-集合-ConcurrentSkipListMap、ConcurrentSkipListSet

NBA这赛季结束,勇士可惜啊,谁能想到没拿到冠军,库昊也没成为真正的老大,lbl一战封神,所有口水留言都变成羡慕嫉妒恨,哎,我库啊,还是还是看书吧. ConcurrentSkipListMap说实话,之前还真没注意过,还是看JUC才看到,利用skiplist跳表结构来实现一种有序的map,之前看到的map都是无序.在学习前还是要好好了解下什么是skiplist跳表,的确很不错,利用空间换时间,复杂度为logN,跳表的原理参考http://kenby.iteye.com/blog/1187303,