Java concurrent AQS 源码详解

一、引言

  AQS(同步阻塞队列)是concurrent包下锁机制实现的基础,相信大家在读完本篇博客后会对AQS框架有一个较为清晰的认识

  这篇博客主要针对AbstractQueuedSynchronizer的源码进行分析,大致分为三个部分:

    1.   静态内部类Node的解析
    2.   重要常量以及字段的解析
    3.   重要方法的源码详解。

  所有的分析仅基于个人的理解,若有不正之处,请谅解和批评指正,不胜感激!!!

二、Node解析

  AQS在内部维护了一个同步阻塞队列,下面简称sync queue,该队列的元素即静态内部类Node的实例

  首先来看Node中涉及的常量定义,源码如下

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor‘s thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

  以下两个均为Node#nextWaiter字段的可取值

    SHARED:若Node#nextWaiter为SHARED,那么表明该Node节点处于共享模式

    EXCLUSIVE:若Node#nextWaiter为EXCLUSIVE,那么表明该Node节点处于独占模式

  以下五个均为Node#waitStatus字段的可取值

    CANCELLED:用于标记一个已被取消的节点,一旦Node#waitStatus的值被设为CANCELLED,那么waitStatus的值便不再被改变

    SIGNAL:标记一个节点(记为node)处于这样一种状态:当node释放资源(unlock/release)时,node节点必须唤醒其后继节点

    CONDITION:用于标记一个节点位于条件变量的阻塞队列中(我称这个阻塞队列为Condition list),本篇暂不介绍Condition相关源码,因此读者可以暂时忽略

    PROPAGATE:仅用于标记sync queue头节点,且为一种暂时状态,表明共享状态正在传递中(如果有剩余资源且sync queue中仍有等待的节点,那么这些节点会依次获取资源,直至资源消耗殆尽或者队列为空),仅在共享模式中出现

  其次,再看Node中重要字段,源码如下

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn‘t need to
         * signal. So, most code doesn‘t need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev‘s from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

    waitStatus:节点的状态,可取值有五种,分别是SIGNAL、CANCEL、CONDITION、PROPAGATE、0。其中独占模式仅涉及到SIGNAL、CANCEL、0三种状态,共享模式仅涉及到SIGNAL、CANCEL、PROPAGATE、0四种状态。CONDITION状态不会出现在sync queue中,而是位于条件变量的Condition list中,本篇博客暂不讨论ConditoinObject

  pre:前继节点,该字段通过CAS操作进行赋值,保证可靠(现在不理解没关系,后面的方法解析会多次提到)

  next:后继节点,该字段的赋值操作是非线程安全的,即next是不可靠的(Node#next为null并不代表节点不存在后继)。但是,一旦next不为null,那么next也是可靠的(现在不理解没关系,后面的方法解析会多次提到)

  thread:该节点关联的线程

  nextWaiter:独占模式中就是null,共享模式中就是SHARED。在ConditionObject的Condition list中指向下一个节点

  注意:Condition list用nextWaiter来连接单向链表(pre与next是无用的),sync queue利用pre和next来连接双向链表(nextWaiter仅用于标记独占或者共享模式而已),不要搞混了!!!

三、AQS字段解析

  AQS字段仅有三个,源码如下

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;

    head:sync queue队列的头节点

  tail:sync queue队列的尾节点

  state:资源状态

四、重要方法解析

4.1 acquire

  该方法是独占模式下的入口方法,可以相应interrupt,但是不会抛出InterruptedException异常

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        //首先执行tryAcquire(arg)尝试获取资源,如果成功则直接返回
        //如果tryAcquire(arg)获取资源失败,则讲当前线程封装成Node节点加入到sync queue队列中,并通过acquireQueued进行获取资源直至成功(如果尚未有资源可获取,那么acquireQueued会阻塞当前线程)
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }    

  

  其中tryAcquire方法如下,该方法的具体含义交给AQS的子类去完成,注意,该方法的实现不可有任何耗时操作,更不可阻塞线程,仅实现是否可获取资源(换言之,是否可获取锁)的逻辑即可,源码如下

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

  

  addWaiter的作用是:将当前线程封装成一个Node节点,并且添加到sync queue中

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        //生成指定模式的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //以下几行进行入队操作,如果失败,交给enq进行入队处理。其实,我认为可以直接调用enq,不知道作者设置如下几行的意图
        if (pred != null) {
            node.prev = pred;
            //通过CAS操作串行化并发入队操作,仅有一个线程会成功,由于node节点的prev字段是在CAS操作之前进行的,一旦CAS操作成功,node节点的prev字段就是指向了其前继节点,因此说prev字段是安全的
            if (compareAndSetTail(pred, node)) {
                //这里直接通过赋值操作赋值next字段,注意,可能有别的线程会在next字段赋值之前访问到next字段,因此next字段是非可靠的(一个节点的next字段为null并不代表该节点没有后继)
                pred.next = node;
                //一旦next字段赋值成功,那么next字段又变为可靠的了
                return node;
            }
        }
        //通过enq入队
        enq(node);
        return node;
    }

  enq入队,源码如下

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node‘s predecessor
     */
    private Node enq(final Node node) {
        //死循环进行入队操作,CAS操作常规模式
        for (;;) {
            Node t = tail;
            //此时队列为空,需要初始化
            if (t == null) { // Must initialize
                //此时可能多个线程都在执行该方法,因此只有一个线程才能初始化sync queue,此处添加的节点我称之为Dummy Node,该节点没有关联线程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //以下四行与addWaiter类似,不再赘述
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }                

  这里抛出一个问题:在初始化sync queue中,将一个new Node()设置为了sync queue的头结点,该节点没有关联任何线程,我称之为"Dummy Node",这个头结点"Dummy Node"待会可能会被设置为SIGNAL状态,那么它是如何唤醒后继节点的呢?我会在在讲到release时进行解释

  到这里,线程已被封装成节点,并且成功添加到sync queue中去了,接下来,来看最重要的acquireQueued方法

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        //用于记录是否获取成功,我现在还不清楚何时会失败= =
        boolean failed = true;
        try {
            //记录是否被中断过,如果被中断过,则需要在acquire方法中恢复中断现场
            boolean interrupted = false;
            //同样的套路,CAS配合死循环
            for (;;) {
                //获取node节点的前继节点p
                final Node p = node.predecessor();
                //当p为sync queue头结点时,才有资格尝试获取资源,换言之,当且仅当一个节点是sync queue中第二个节点时,它才有资格获取资源
                if (p == head && tryAcquire(arg)) {
                    //一旦获取成功,以下语句都是线程安全的,所有字段直接赋值即可,不需要CAS或者加锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //否则,找到前继节点,并将其设置为SIGNAL状态后阻塞自己
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                //如果失败了
                cancelAcquire(node);
        }
    }

  该方法的主要逻辑就是:不断地通过死循环执行获取资源(当且仅当节点是sync queue中第二个节点时才有资格获取资源)或者阻塞自己的操作,只有成果获取资源后才能够返回

  接下来,来看shouldParkAfterFailedAcquire方法以及parkAndCheckInterrupt方法

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node‘s predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //一旦发现前继节点是SIGNAL状态,就返回true,在acquireQueued方法中会阻塞当前线程
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            //这里给出两个问题:
                 //1.如果在当前线程阻塞之前,前继节点就唤醒了当前线程,那么当前线程不就永远阻塞下去了吗?
                 //2.万一有别的线程更改了前继节点的状态,导致前继节点不唤醒当前线程,那么当前线程不就永远阻塞下去了吗?
            return true;

        //如果前继节点处于CANCELL状态(仅有CANCELL状态大于0)
        if (ws > 0) {
            //那么跳过那些被CANCELL的节点,先前找到第一个有效节点
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        //前继节点状态要么是0,要么是PROPAGATE,将其通过CAS操作设为SIGNAL,不用管是否成功,退回到上层函数acquireQueued进行再次判断
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don‘t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

  该方法的主要逻辑就是:将前继节点设置为SIGNAL

  关于上面提到的两个问题

    1. 如果在当前线程阻塞之前,前继节点就唤醒了当前线程,那么当前线程不就永远阻塞下去了吗?--->AQS采用的是Unsafe#park以及Unsafe#unpark,这对方法能够很好的处理这类问题,可以先unpark获取一枚许可,然后执行park不会阻塞当前线程,而是消耗这个提前获取的许可,注意,多次unpark仅能获取一枚许可

    2.万一有别的线程更改了前继节点的状态,导致前继节点不唤醒当前线程,那么当前线程不就永远阻塞下去了吗?--->一旦一个节点被设为SIGNAL状态,AQS框架保证,任何改变其SIGNAL状态的操作都会唤醒其后继节点,因此,只要节点看到其前继节点为SIGNAL状态,便可放心阻塞自己

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //返回是否被中断过
        return Thread.interrupted();
    }

  至此,独占模式的acquire调用链分析完毕,总结一下:首先尝试获取锁(tryAcquire),若成功则直接返回。若失败,将当前线程封装成Node节点加入到sync queue队列中,当该节点位于第二个节点时,会重新尝试获取锁,成功则返回,失败则阻塞自己,直至前继节点唤醒自己

  AQS通过死循环以及CAS操作来串行化并发操作,并且通过这种适当的自旋加阻塞,来减少频繁的加锁解锁操作

4.2 release

  release方法是独占模式下释放资源(即解锁)的入口,源码如下

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        //调用tryRelease尝试释放资源
        if (tryRelease(arg)) {
            Node h = head;
            //只要头节点不为空且状态不为0,就唤醒后继节点,对于独占模式也就只有SIGNAL状态一种,头结点在任何情况下都不可能为CANCELL状态
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  在此,解释一下enq方法中提到的问题,即那个"Dummy Node"如何唤醒后继:由于"Dummy Node"不关联任何线程,因此真正的唤醒操作实际上是由外部的线程来完成的,这里的外部线程是指从未进入sync queue的线程,因此,"Dummy Node"节点设置为SIGNAL状态,也能够正常唤醒后继

  同理,tryRelease也是交给AQS子类实现的方法,只需要定义释放资源的逻辑即可,该方法的实现不应该有耗时的操作,更不该阻塞

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

  通过unparkSuccessor方法唤醒指定节点的后继节点

    /**
     * Wakes up node‘s successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //若节点状态小于0,将其通过CAS操作改为0,表明本次SIGNAL的任务已经完成,至于CAS是否成功,或者是否再次被其他线程修改,都与本次无关unparkSuccessor无关,只是该节点被赋予了新的任务而已。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //这里通过非可靠的next字段直接获取后继,如果非空,那么说明该字段可靠,如果为空,那么利用可靠的prev字段从tail向前找到当前node节点的后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒后继节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }        

4.3 acquireShared

待续

4.4 releaseShared

待续

  

  

时间: 2024-10-14 20:13:18

Java concurrent AQS 源码详解的相关文章

java集合11--HashTable源码详解

概要 前一章,我们学习了HashMap.这一章,我们对Hashtable进行学习. 我们先对Hashtable有个整体认识,然后再学习它的源码,最后再通过实例来学会使用Hashtable. 第1部分 Hashtable介绍 第2部分 Hashtable数据结构 第3部分 Hashtable源码解析(基于JDK1.6.0_45) 第4部分 Hashtable遍历方式 第5部分 Hashtable示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/331

java集合12--TreeMap源码详解

概要 这一章,我们对TreeMap进行学习. 我们先对TreeMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用TreeMap.内容包括: 第1部分 TreeMap介绍 第2部分 TreeMap数据结构 第3部分 TreeMap源码解析(基于JDK1.6.0_45) 第4部分 TreeMap遍历方式 第5部分 TreeMap示例 转载请注明出处:http://www.cnblogs.com/skywang12345/admin/EditPosts.aspx?postid=33109

java集合13--WeakHashMap源码详解

概要 这一章,我们对WeakHashMap进行学习. 我们先对WeakHashMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用WeakHashMap. 第1部分 WeakHashMap介绍 第2部分 WeakHashMap数据结构 第3部分 WeakHashMap源码解析(基于JDK1.6.0_45) 第4部分 WeakHashMap遍历方式 第5部分 WeakHashMap示例 转载请注明出处:http://www.cnblogs.com/skywang12345/admin/

深入Java基础(四)--哈希表(1)HashMap应用及源码详解

继续深入Java基础系列.今天是研究下哈希表,毕竟我们很多应用层的查找存储框架都是哈希作为它的根数据结构进行封装的嘛. 本系列: (1)深入Java基础(一)--基本数据类型及其包装类 (2)深入Java基础(二)--字符串家族 (3)深入Java基础(三)–集合(1)集合父类以及父接口源码及理解 (4)深入Java基础(三)–集合(2)ArrayList和其继承树源码解析以及其注意事项 文章结构:(1)哈希概述及HashMap应用:(2)HashMap源码分析:(3)再次总结关键点 一.哈希概

Guava Cache源码详解

目录 一.引子 二.使用方法 2.1 CacheBuilder有3种失效重载模式 2.2 测试验证 三.源码剖析 3.1 简介 3.2 源码剖析 四.总结 优点: 缺点: 正文 回到顶部 一.引子 缓存有很多种解决方案,常见的是: 1.存储在内存中 : 内存缓存顾名思义直接存储在JVM内存中,JVM宕机那么内存丢失,读写速度快,但受内存大小的限制,且有丢失数据风险. 2.存储在磁盘中: 即从内存落地并序列化写入磁盘的缓存,持久化在磁盘,读写需要IO效率低,但是安全. 3.内存+磁盘组合方式:这种

Spring IOC源码详解之容器依赖注入

Spring IOC源码详解之容器依赖注入 上一篇博客中介绍了IOC容器的初始化,通过源码分析大致了解了IOC容器初始化的一些知识,先简单回顾下上篇的内容 载入bean定义文件的过程,这个过程是通过BeanDefinitionReader来完成的,其中通过 loadBeanDefinition()来对定义文件进行解析和根据Spring定义的bean规则进行处理 - 事实上和Spring定义的bean规则相关的处理是在BeanDefinitionParserDelegate中完成的,完成这个处理需

IntentService源码详解

IntentService可以做什么: 如果你有一个任务,分成n个子任务,需要它们按照顺序完成.如果需要放到一个服务中完成,那么IntentService就会使最好的选择. IntentService是什么: IntentService是一个Service(看起来像废话,但是我第一眼看到这个名字,首先注意的是Intent啊.),所以如果自定义一个IntentService的话,一定要在AndroidManifest.xml里面声明. 从上面的"可以做什么"我们大概可以猜测一下Inten

butterknife源码详解

butterknife源码详解 作为Android开发者,大家肯定都知道大名鼎鼎的butterknife.它大大的提高了开发效率,虽然在很早之前就开始使用它了,但是只知道是通过注解的方式实现的,却一直没有仔细的学习下大牛的代码.最近在学习运行时注解,决定今天来系统的分析下butterknife的实现原理. 如果你之前不了解Annotation,那强烈建议你先看注解使用. 废多看图: 从图中可以很直观的看出它的module结构,以及使用示例代码. 它的目录和我们在注解使用这篇文章中介绍的一样,大体

《GIS软件ShapMap源码详解及应用》概述

我喜欢GIS二次开发,即使有的人看不起:我不懂开源GIS,只会点商业的GIS,有的人更加瞧不起.我认为,我不能改变现实这个环境,但可以创造一些价值.找到一本<GIS软件ShapMap源码详解及应用>来学习,我倒要看看开源GIS是什么样子. 当前GIS软件有商业GIS系统及开源GIS系统之分.GIS商用软件功能强 大,有完善的技术支持,提供封装好的.功能强大的类库,基于商用GIS库进 行的二次开发效率高.难度低.资源丰富.但对于小型GIS开发人员,商用 GIS价格过高,对于GIS学习者来说,由于