读AbstractQueuedSynchronizer类源码

感受

这个类的代码除去注释差不多有千多行,要想把所有代码都读完,然后按照作者的思路给理解完,是不容易的.这里我仔仔细细读了差不多一半的代码,说难倒不是很难.

虽然没有完全看完,但是基本上理解了作者代码的意图..说得简单些,就是操作一个双向链表.而链表中的每个节点有多种状态.AQS就是要保证整个双向链表和节点的状态的正确性.

连续看了好几天的JUC相关的源码,现在脑袋真有点晕乎乎的.加上这个类的代码确实有点多,这个就不在整理这个类的分析结果,就直接把代码copy了,并附上一张图.(这里只贴出分析过的那部分代码) 如果有朋友需要一起探讨的,留言就好了.

public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() {
    }

    //链接节点类
    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        //表示当前节点的线程已经被取消
        static final int CANCELLED = 1;

        //表示当前节点的后继节点应该被唤醒
        static final int SIGNAL = -1;

        //表示当前节点的线程正在等待某一个条件
        static final int CONDITION = -2;

        static final int PROPAGATE = -3;

        //上面四个状态之一或者0
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //返回前继节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     *
     * @param node the node to insert
     * @return node‘s predecessor 返回插入节点的前继节点
     */
    //返回插入节点的前继节点
    //是线程安全的方法,试想多个线程同时执行这个方法,分析代码可知是不会出现数据不一致的情况的
    //假如多个线程同时执行这个方法,且此时tail为null,那么可能出现他们同时执行a行代码的情况,
    //但是a行代码是一个CAS操作,所以只会有一个线程执行成功,也就是说只会有一个线程才会执行b行代码.
    //然后所有的线程又会进入循环.然会它们可能又同时执行了c行代码
    // (也就是所有的线程都把各自要插入的节点的前继节点设置为了队列的尾节点,初看上去不应该这样,但是莫慌)
    //然后所有的线程可能同时执行d代码,但是d行代码事CAS操作,所以也只会有个线程执行成功,也就是说只会有一个线程执行e行并返回.
    //而剩余的线程又会进入循环.此时尾节点已经更新,所以不会造成数据的不一致
    private Node enq(final Node node) {
        for (; ; ) {                                          //
            Node t = tail;                                  //
            if (t == null) { // Must initialize             //
                if (compareAndSetHead(new Node()))          //a
                    tail = head;                            //b
            } else {
                node.prev = t;                              //c
                if (compareAndSetTail(t, node)) {           //d
                    t.next = node;                          //e
                    return t;
                }
            }
        }
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    //参数的值那么是 Node.EXCLUSIVE 要么是 Node.SHARED
    private Node addWaiter(Node mode) {
        //这行代码不存在竞争资源,所以不存在线程是否安全的问题
        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure
        // 下面的代码会先尝试以最快的方式入队列,同时如果失败就会以常规方式入队列作为后备计划
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //如果多个线程同时执行下面的代码,那么只会有个线程会返回,所以其他线程则执行a代码
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //以常规方式入队列
        enq(node);                              //a
        return node;
    }

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    /**
     * Wakes up node‘s successor, if one exists.
     *
     * @param node the node
     */
    //如果输入节点存在后继节点,则唤醒之.如果不存在,则唤醒队列中第一个非CANCELLED状态的节点(如果有的话).
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //如果当且节点对应的线程正处于 Node.SIGNAL OR Node.CONDITION OR Node.PROPAGATE 三个状态之一
        //则通过CAS操作把状态更改为0 如果更改失败也没有关系
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //得到后继节点
        Node s = node.next;

        //如果后继节点不存在或者后继节点对应的线程已经被取消掉,则从尾节点向头节点遍历(排除输入节点),
        //只要遍历的节点的状态不是CANCELLED,则记录之.最后的效果就相当于从前往后找到整个队列中第一个非CANCELLED状态的节点(排除输入节点)
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }

        if (s != null)
            //如果s!=null,则s可能是输入节点的后继节点,也可能是队列中第一个非CANCELLED状态的节点(排除输入节点)
            //取消阻塞s节点
            //需要注意的是:如果n个线程同时执行unparkSuccessor(Node)方法,且最后的s不为null,那么就会在s对应的线程上执行n次unpart操作.
            //这样的话,后面的n-1次unpark操作可能都是在非阻塞情况下执行.需要注意对非阻塞线程进行unpark操作的影响.
            LockSupport.unpark(s.thread);
    }

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    //建议先不要看这段总结,直接看方法内部的说明.内部看完后再来看这段总结
    //
    //整个方法总结下来就是:要求整个方法执行过程中没有其他线程更改头节点的值,否则线程一直在循环体中不出来.
    //假设整个方法的执行过程中,头节点的值没有发生变化.
    //
    //那么如果有n个线程同时执行这个方法,那么最后的效果就是
    //有一个线程把头节点的值更改为了0,同时执行了unpartSuccessor(h)操作.
    //而其余的线程中又会有一个线程会在头几点的状态更改为0之后又把头节点的状态更改为PROPAGATE
    //然后剩余的n-2个线程则不会更改任何东西就直接结束了方法的执行
    //
    //上面的分析是在多个线程,头节点开始状态是SIGNAL的情况下,当然还有其他情况下的执行结果.
    //这里就不在一一说明.比如
    //  多线程,开始状态为0的情况                      :有一个线程把状态更改为了PROPAGATE,然后退出方法.其他线程不做任何改变然后退出方法
    //  多线程,开始状态既不是SIGNAL也不是0的情况        :所有线程不做什么改变然后退出方法
    //
    //  单线程,开始状态为SIGNAL的情况                 :头节点状态被更改为0,且执行了unparkSuccessor操作
    //  单线程,开始状态为0的情况;                     :头节点状态被更改为PROPAGATE
    //  单线程,开始状态既不是SIGNAL也不是0的情况        :线程不做什么改变然后退出方法
    //
    //再次总结:(下面是单线程情况下.多线程可以看做是下面的流程执行了多次)
    //在头节点不改变的前提下
    //  如果头节点的状态是SIGNAL,则把状态更改为0,同时执行unparkSuccessor操作
    //  如果头节点状态是0,则把状态更改为PROPAGATE
    //  如果是其他状态,什么也不改变
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (; ; ) {
            //获取此时的头节点
            Node h = head;

            //如果h节点(不能说头节点,因为这个过程中,可能有其他线程更改了头节点的值)不为null,且h节点不是尾节点(即队列长度大于等于2)
            if (h != null && h != tail) {

                //获取h节点的状态
                int ws = h.waitStatus;

                //如果h节点的状态是SIGNAL
                if (ws == Node.SIGNAL) {

                    //把h节点的状态更改为0  如果有多个线程同时执行到这里,且获取到的头节点都是同一个节点,那么只会有一个线程执行成功.
                    //成功更改状态的那个线程就会执行 取消阻塞h节点的后继节点 的操作,然后执行a行代码.失败的线程则又会进入循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }

                //如果h节点的状态是0,则把状态更改为PROPAGATE
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }

            //如果线程执行到这里,如果没有其他线程更改头节点,则跳出循环结束方法.
            //而如果有其他线程更改了头节点的值,则又进入循环
            if (h == head)                   // loop if head changed         //a
                break;
        }

    }

    /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node      the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        //这个条件没有看明白,如果输入node为null,那后面的代码岂不报错?
        if (propagate > 0
                || h == null
                || h.waitStatus < 0
                || (h = head) == null
                || h.waitStatus < 0
                ) {

            //得到输入节点的后继节点
            Node s = node.next;

            //如果不存在后继节点或者后继节点是分享模式,则执行doReleaseShared();
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    // Utilities for various versions of acquire

    /**
     * Cancels an ongoing attempt to acquire.
     *
     * @param node the node
     */
    //看这个方法的时候配合后面的图也许好理解些.
    //这个图只描述了一种情况,其他情况的图就没有画了.
    private void cancelAcquire(Node node) {
        // Ignore if node doesn‘t exist
        //要求输入node不为null
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;

        // 图A
        //循环条件是 状态为CANCELLED.
        //跳出循环后的结果:
        // pred 指向的是 从输入节点的前继节点开始向头节点遍历,遇到的第一个非CANCELLED状态的节点
        // 输入节点的前继节点指向 pred 节点
        //
        // 如果多线程同时执行这个循环,会导致node.prev的值不一致,
        // 所以AQS框架应该会在外部进行限制,使得不会有多个线程同时执行这个方法.呆会会看到这个限制
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //图B
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        //图C
        // If we are the tail, remove ourselves.
        //
        //如果输入节点是尾节点,就把尾节点设置为pred节点,同时把pred节点的后继节点设置为null
        // 上面的分析得出不会有多个线程同时执行这个方法,但是有可能多个线程同时在执行compareAndSetTail操作.
        // 所以如果执行该方法的线程在执行compareAndSetTail时有可能返回false.而如果返回false,则进入到else
        if (node == tail && compareAndSetTail(node, pred)) {   //a
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred‘s next-link
            // so it will get one. Otherwise wake it up to propagate.
            //图C
            //使得线程执行这部分代码,有两种情况:
            //  输入节点不是尾节点
            //  输入节点是尾节点,但更新尾节点的值时失败.(这种情况的话,node节点又变为了不是尾节点)
            //  所以无论是上述哪一种情况,执行到这里时,node节点肯定不是尾节点了(而此时的尾节点到底是在node节点前还是后呢?)
            int ws;

            //我勒个天,这个条件好难理

            if (pred != head
                    && ( (ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                    && pred.thread != null
                    ) {
                //执行这段代码的条件:
                //  pred不是头节点 且 pred节点的状态是SIGNAL(如果不是SIGNAL,则要求成功把状态更改为SIGNAL) 且 pred节点的线程不是null
                Node next = node.next;

                if (next != null && next.waitStatus <= 0)
                    //执行条件是 此时的node节点有后继节点 且 后继节点的状态是非CANCELLED状态
                    compareAndSetNext(pred, predNext, next);
            } else {

                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
}

附图

时间: 2024-10-29 19:09:41

读AbstractQueuedSynchronizer类源码的相关文章

读Unsafe类源码

Unsafe类简介 JUC中很多的实现都是调用了Unsafe类来实现的,所以这里阅读下该类的内容. Unsafe类包装了很多低级别的非安全性操作.虽然该类及其所有的方法都是public的,但是它只能被受信任的代码使用(也就是jdk中的代码) 读源码过程中,这里只会对部分方法进行说明,其余的方法要么类似要么不那么重要. 读源码 //构造方法私有化 private Unsafe() {} //采用饿汉式单例 private static final Unsafe theUnsafe = new Un

Java集合---Array类源码解析

Java集合---Array类源码解析              ---转自:牛奶.不加糖 一.Arrays.sort()数组排序 Java Arrays中提供了对所有类型的排序.其中主要分为Primitive(8种基本类型)和Object两大类. 基本类型:采用调优的快速排序: 对象类型:采用改进的归并排序. 1.对于基本类型源码分析如下(以int[]为例): Java对Primitive(int,float等原型数据)数组采用快速排序,对Object对象数组采用归并排序.对这一区别,sun在

AbstractQueuedSynchronizer(AQS)源码解析(一)

在JDK1.5版本,新增了并发包,其中包含了显示锁.并发容器.在这些锁和容器里,都有同步器(AQS)的身影.为了更好地理解JDK的并发包,我会用三个主题来详细描述AbstractQueuedSynchronizer的实现. 在AQS中,涉及到同步队列以及Condition对象,这也是我为什么要用三个主题来讲述的原因.本节将主要讲述同步队列,后面两节会分别讲述Condition对象以及AQS的主要功能实现. AQS同步队列的主要功能是将无法获得资源的线程放入同步队列中,进行等待,它是通过链表来

android 小说类源码制作教程源码下载

自己闲着没事制作了个小说软件用来自己看全本/连载小说, 翻页,字体大小,目录,自动更新 具体效果如下:奉献给大家下载查看... 下载APK效果查看地址: http://yun.baidu.com/s/1gdknYyJ 源码下载地址: http://download.csdn.net/detail/ainibaifenbai/7575817 android 小说类源码制作教程源码下载,布布扣,bubuko.com

Scroller类源码解析及其应用(一)

滑动是我们在自定义控件时候经常遇见的难听,让新手们倍感困惑,这篇文章主要介绍Scroller类的源码,告诉打击这个到底有什么用,怎么使用它来控制滑动.另外,我还会结合一个简单的例子,来看一下这个类的应用. 要说明Scroller类,我们往往要从另外两个方法说起,一个是ScrollTo(),一个是ScrollBy() 这两个方法我们可以在View的源码看到,我们知道其实每个空间都有滚动条,只是有的我们将它隐藏,所以我们看不见 下面是ScrollTo方法 /** * Set the scrolled

Thread类源码剖析

目录 1.引子 2.JVM线程状态 3.Thread常用方法 4.拓展点 一.引子 说来也有些汗颜,搞了几年java,忽然发现竟然没拜读过java.lang.Thread类源码,这次特地拿出来晒一晒.本文将剖析Thread类源码(本文后面源码全部默认JDK8),并讲解一些重要的拓展点.希望对大家能有一些帮助. 本文讲解主干全部出自源码和注释,保证了权威性.(注意:网上,某些书中很多观点都是错的,过时的,片面的,所以大家一定要看源码,重要事情说N遍,看源码!看源码!看源码......) 二.JVM

List 接口以及实现类和相关类源码分析

List 接口以及实现类和相关类源码分析 List接口分析 接口描述 用户可以对列表进行随机的读取(get),插入(add),删除(remove),修改(set),也可批量增加(addAll),删除(removeAll,retainAll),获取(subList). 还有一些判定操作:包含(contains[All]),相等(equals),索引(indexOf,lastIndexOf),大小(size). 还有获取元素类型数组的操作:toArray() 注意事项 两种迭代器Iterator和L

Cocos2d-X3.0 刨根问底(六)----- 调度器Scheduler类源码分析

上一章,我们分析Node类的源码,在Node类里面耦合了一个 Scheduler 类的对象,这章我们就来剖析Cocos2d-x的调度器 Scheduler 类的源码,从源码中去了解它的实现与应用方法. 直入正题,我们打开CCScheduler.h文件看下里面都藏了些什么. 打开了CCScheduler.h 文件,还好,这个文件没有ccnode.h那么大有上午行,不然真的吐血了, 仅仅不到500行代码.这个文件里面一共有五个类的定义,老规矩,从加载的头文件开始阅读. #include <funct

ASP.NET实现用户在线检测的类源码

//online.cs(用户在线检测) /*程序实现思路: 该用户有以下几个属性: name:用户名 sessionID:用户ID,通过它唯一表示一个用户 iswhere :附加信息,用户当前所在位置 lasttime:用户登陆时间 curtime:本次刷新时间 在客户端,使用一个IFRAME,装载一个刷新页面,每隔XX秒更新一下他的名字对应的curtime,就表示他仍然在 在服务器端,建立一个守护线程,每隔固定时间就运行一遍,然后判断当前所有用户列表中的时间间隔是否超出了规定的时间,如果超出,