ConcurrentLinkedQueue解析

主要是结合java并发编程这本书加上自己的运行环境进行总结的,网上这篇资料的环境好些是jdk1.6,我自己的环境是1.7.

1.    引言

在并发编程中我们有时候需要使用线程安全的队列。如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。

2.    ConcurrentLinkedQueue的介绍

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改, Michael & Scott算法的详细信息可以参见参考资料一

3.    ConcurrentLinkedQueue的结构

我们通过ConcurrentLinkedQueue的类图来分析一下它的结构。

(图1)

ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tair节点等于head节点。


    private transient volatile Node<E> tail;

    /**

     * Creates a {@code ConcurrentLinkedQueue} that is initially empty.

     */

    public ConcurrentLinkedQueue() {

        head = tail = new Node<E>(null);

    } 

其中Node节点的源码如下:


private static class Node<E> {  

       // Node 里面包含了 item 节点值  以及 下一个节点 next  

       // item 和 next 都是valatile  可见性保证了  

       volatile E item;  

       volatile Node<E> next;  

  

       private static final sun.misc.Unsafe UNSAFE;  

       // 并且初始化的时候 就会获得item 和 next 的偏移量  

       // 这为后面的cas 做了准备,如何使用继续看下面  

       private static final long itemOffset;  

       private static final long nextOffset;  

       static {  

           try {  

               UNSAFE = sun.misc.Unsafe.getUnsafe();  

               Class k = Node.class;  

               itemOffset = UNSAFE.objectFieldOffset  

                   (k.getDeclaredField("item"));  

               nextOffset = UNSAFE.objectFieldOffset  

                   (k.getDeclaredField("next"));  

           } catch (Exception e) {  

               throw new Error(e);  

           }  

       }  

   }  

4.    入队列

入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tair节点的变化,每添加一个节点我就做了一个队列的快照图。

(图二)

  • 第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
  • 第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
  • 第三步添加元素3,设置tail节点的next节点为元素3节点。
  • 第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

通过debug入队过程并观察head节点和tail节点的变化,发现入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。

上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS算法来入队的。


public boolean offer(E e) {

        checkNotNull(e);

        //创建一个新的入队节点

        final Node<E> newNode = new Node<E>(e);

     //t是一个指向tail的引用,p用来表示队列的尾节点,开始默认p等于tail

        for (Node<E> t = tail, p = t;;) {

            Node<E> q = p.next;//q是尾节点p的下一个节点

            if (q == null) { //说明p是尾节点

                // p is last node

                if (p.casNext(null, newNode)) {  //将新节点CAS加到队列末尾

                    // Successful CAS is the linearization point

                    // for e to become an element of this queue,

                    // and for newNode to become "live".

                 //如果p和tail引用p不同,说明引用t指向的节点不是最新尾节点,让t指向最新的节点

                    if (p != t) // hop two nodes at a time

                      //更新tail节点引用,允许失败

                        casTail(t, newNode);  // Failure is OK.

                    return true;  //更新结束

                }

                // Lost CAS race to another thread; re-read next

            }

            else if (p == q)  //说明p节点和其尾节点都是null,这表明这个队列刚刚初始化,因此返回head节点

                // We have fallen off list.  If tail is unchanged, it

                // will also be off-list, in which case we need to

                // jump to head, from which all live nodes are always

                // reachable.  Else the new tail is a better bet.

         //这个条件表达式看着好复杂,看看(t != (t = tail)) 先赋值再判断不等,一直都是false,疑问为嘛不直接p=head

                p = (t != (t = tail)) ? t : head;

            else

                // Check for tail updates after two hops.

             //这里同上,为嘛不直接p=q;

                p = (p != t && t != (t = tail)) ? t : q;

        }

    }  

从源代码角度来看整个入队过程主要做二件事情。第一是定位出尾节点,第二是使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。

第一步定位尾节点。tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点,尾节点可能就是tail节点,也可能是tail节点的next节点。代码中循环体中就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回head节点

第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

在jdk1.6,入队列操作是用hops来进行控制更新tail次数的,偶的运行环境是1.7,所以代码如上,感觉没有对tail进行cas更新距离的限制,上面分析过对于先进先出的队列入队所要做的事情就是将入队节点设置成尾节点,doug lea写的代码和逻辑还是稍微有点复杂。那么我用以下方式来实现行不行?

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        Node<E> n = new Node<E>(e);
        for (;;) {
            Node<E> t = tail;
            if (t.casNext(null, n) && casTail(t, n)) {
                return true;
            }
        }
    }

让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率,所以doug lea使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将 tail节点更新成尾节点,而是当 tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

另外进队列操作永远都是返回True,所以不要通过返回值来判断入队是否成功

5.    出队列

    出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。

从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。让我们再通过源码来深入分析下出队过程。


 public E poll() {

        restartFromHead:

        for (;;) {

         //开始h指向head,p表示头结点,需要出队的节点

            for (Node<E> h = head, p = h, q;;) {

                E item = p.item;

//p节点里面有元素项,则直接出该元素项,并将p元素项CAS更新为NUll 

                if (item != null && p.casItem(item, null)) {

                    // Successful CAS is the linearization point

                    // for item to be removed from this queue.

                    if (p != h) // hop two nodes at a time

                        //p不为h的时候说明h还在前一个项为NUll的位置,而p在后一个有项item的位置,则更新h指向

                        //若p的后驱不为null,则将h指向p的后驱,因为当前p的项也是要出队的

                        //若p的后驱为NUll,则将h指向p,此时他们的item都为Null

                        updateHead(h, ((q = p.next) != null) ? q : p);

                    return item;

                }

                else if ((q = p.next) == null) {//此时p.item=null的

                  //p的后驱为null,用P更新h值

                    updateHead(h, p);

                    return null;//所以返回NUll

                }

                else if (p == q)

                    continue restartFromHead; /继续循环for

                else

                    p = q;//q在p后面,p中节点已经出队,则让p指向q

            }

        }

    }

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

时间: 2024-07-30 18:28:43

ConcurrentLinkedQueue解析的相关文章

ConcurrentLinkedQueue原码解析

描述 ConcurrentLinkedQueue是一个基于单链表的无界线程安全队列,该队列是FIFO的.ConcurrentLinkedQueue/ConcurrentLinkedDeue和LinkedBlockingQueue/LinkedBlockingDeue 相比,不同点在于它们不提供阻塞功能,并且是Lock-Free的,而后者则是利用ReentrantLock实现的,所以他们具有更高的吞吐量. 源码解析(基于jdk1.7.0_76) 数据结构 类图: head,tail结点定义和描述:

Thrift compiler代码生成类解析

代码生成类解析: Thrift--facebook RPC框架,介绍就不说了,百度,google一大把,使用也不介绍,直接上结构和分析吧. Hello.thrift文件内容如下: namespace java com.tomsun.thrift.generated.demo service Hello { string helloString(1:string para) } 内容很简单,申明个RPC service(Hello),服务方法helloString,方法参数格式(seq: para

EventBus源码解析

用例 本文主要按照如下例子展开: //1. 新建bus对象,默认仅能在主线程上对消息进行调度 Bus bus = new Bus(); // maybe singleton //2. 新建类A(subscriber),answerAvailable()方法为事件回调,在主线程上运行 class A { public A() { bus.register(this); } // 可见性为public,仅有一个Event类型的参数 @Subscribe public void answerAvail

Android 事件总线OTTO使用说明和源码解析

一.Otto简单介绍 OTTO是Square推出的库,地址:https://github.com/square/otto 先来看看otto的官方介绍 An enhanced Guava-based event bus with emphasis on Android support.Otto is an event bus designed to decouple different parts of your application while still allowing them to c

15.并发容器之ConcurrentLinkedQueue

1.ConcurrentLinkedQueue简介 在单线程编程中我们会经常用到一些集合类,比如ArrayList,HashMap等,但是这些类都不是线程安全的类.在面试中也经常会有一些考点,比如ArrayList不是线程安全的,Vector是线程安全.而保障Vector线程安全的方式,是非常粗暴的在方法上用synchronized独占锁,将多线程执行变成串行化.要想将ArrayList变成线程安全的也可以使用Collections.synchronizedList(List<T> list)

ConcurrentLinkedQueue 1.8 源码浅析

[TOC] ConcurrentLinkedQueue 1.8 源码浅析 一,简介 ConcurrentlinkedQueue 还是一个基于链表的,×××的,线程安全的单端队列,它采用先进先出(FIFO)的规则对节点进行排序,当我们加入一个元素时,它会插入队列的尾部,当我们获取元素时,会从队列的首部获取元素.它没有使用锁来保证线程安全,使用的是"wait-free"算法来保证整个队列的线程安全. 二,基本成员简介 Node 节点对象 // 存储的数据 volatile E item;

LinkedTransferQueue 1.8 源码解析

[TOC] LinkedTransferQueue 1.8 源码解析 一,简介 LinkedTransferQueue 是一个由链表结构组成的wujie阻塞传输队列,它是一个很多队列的结合体(ConcurrentLinkedQueue,LinkedBlockingQueue,SynchronousQueue),在除了有基本阻塞队列的功能(但是这个阻塞队列没有使用锁)之外:队列实现了TransferQueue接口重写了tryTransfer和transfer方法,这组方法和SynchronousQ

死磕 java线程系列之线程池深入解析——构造方法

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 简介 ThreadPoolExecutor的构造方法是创建线程池的入口,虽然比较简单,但是信息量很大,由此也能引发一系列的问题,同样地,这也是面试中经常被问到的问题,下面彤哥只是列举了一部分关于ThreadPoolExecutor构造方法的问题,如果你都能回答上来,则可以不用看下面的分析了. 问题 (1)ThreadPoolExecutor有几个构造方法? (2)ThreadPoolExecutor最长的构

Mybaits 源码解析 (九)----- 全网最详细,没有之一:一级缓存和二级缓存源码分析

像Mybatis.Hibernate这样的ORM框架,封装了JDBC的大部分操作,极大的简化了我们对数据库的操作. 在实际项目中,我们发现在一个事务中查询同样的语句两次的时候,第二次没有进行数据库查询,直接返回了结果,实际这种情况我们就可以称为缓存. Mybatis的缓存级别 一级缓存 MyBatis的一级查询缓存(也叫作本地缓存)是基于org.apache.ibatis.cache.impl.PerpetualCache 类的 HashMap本地缓存,其作用域是SqlSession,myBat