AQS源码解析

文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多。还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油!

AQS 结构

属性

private transient volatile Node head;

private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread;


内部类

Node

static final class Node {
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ================================================ 下面的几个int常量是给waitStatus用的 ===============================================
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
    static final int CONDITION = -2;
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // ================================================================================================================================

    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的.
    volatile int waitStatus;
    // ================================================================================================================================

    // 用于阻塞队列
    volatile Node prev;
    volatile Node next;

    // 这个就是线程本尊
    volatile Thread thread;

    // 用于条件队列
    Node nextWaiter;
}

获取独占锁

lock () (摘自Reentrantlock)

public void lock() {
    sync.lock();
}

acquire (int arg)

// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
// 否则,acquireQueued方法会将线程压到队列中
public final void acquire(int arg) { // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
    // 因为有可能直接就成功了呢,也就不需要进队列排队了,
    // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
    if (!tryAcquire(arg) &&
        // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}        

tryAcquire (int acquires) (实现来自ReentrantLock)

// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // state == 0 此时此刻没有线程持有锁
    if (c == 0) {
        // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
        // 看看有没有别人在队列中等了半天了
        if (!hasQueuedPredecessors() &&
            // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
            // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
            // 因为刚刚还没人的,我判断过了
        compareAndSetState(0, acquires)) {

            // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
    // 这里不存在并发问题
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
    // 回到上面一个外层调用方法继续看:
    // if (!tryAcquire(arg)
    //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    //     selfInterrupt();
    return false;
}                

tryAcquire(arg) 如果返回false,那么代码将执行   acquireQueued(addWaiter(Node.EXCLUSIVE), arg)   这个方法,首先需要执行: addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE)

// 此方法的作用是把线程包装成node,同时进入到队列中
// 参数mode此时是Node.EXCLUSIVE,代表独占模式
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
    Node pred = tail;

    // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
    if (pred != null) {
        // 将当前的队尾节点,设置为自己的前驱
        node.prev = pred;
        // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
        if (compareAndSetTail(pred, node)) {
            // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
            // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了
            pred.next = node;
            // 线程入队了,可以返回了
            return node;
        }
    }
    // 仔细看看上面的代码,如果会到这里,
    // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
    enq(node);
    return node;
}

enq (final Node node)

// 采用自旋的方式入队
// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 之前说过,队列为空也会进来这里
        if (t == null) { // 初始化head节点
            // 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
            // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
            if (compareAndSetHead(new Node()))
                // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了

                // 这个时候有了head,但是tail还是null,设置一下,
                // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
                // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
                // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
                tail = head;
        } else {
            // 下面几行,和上一个方法 addWaiter 是一样的,
            // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

tryAcquire(arg) 如果返回false,那么代码将执行   acquireQueued(addWaiter(Node.EXCLUSIVE), arg)   这个方法

acquireQueued(final Node node, int arg)

// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
// 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
// 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
            // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
            // 所以当前节点可以去试抢一下锁
            // 这里我们说一下,为什么可以去试试:
            // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
            // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
            // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
            // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
            // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 什么时候 failed 会为 true???
        // tryAcquire() 方法抛异常的情况
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire(Node pred, Node node)

// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
    if (ws == Node.SIGNAL)
        return true;

    // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
    // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
    // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
    // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
    // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 仔细想想,如果进入到这个分支意味着什么
        // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
        // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
        // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
        // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 这个方法返回 false,那么会再走一次 for 循序,
    //     然后再次进来此方法,此时会从第一个分支返回 true
    return false;
}

parkAndCheckInterrupt

// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

下面我画了张图帮大家理一下思路,转载注明出处

附录


释放独占锁

unlock () (摘自Reentrantlock)

public void unlock() {
    sync.release(1);
}

release (int arg)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease (int arg)(实现来自Reentrantlock)

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全释放锁
    boolean free = false;
    // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor (Node node)

// 唤醒后继节点
// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果head节点当前waitStatus<0, 将其修改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
    // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 刚刚线程被挂起在这里了
    // interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
    return Thread.interrupted();
}

条件锁

我们借Reentrantlock来看一下条件锁

Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
final ConditionObject newCondition() {
    // 实例化一个 ConditionObject
    return new ConditionObject();
}

ConditionObject是AQS中的一个内部类,类似于之前提到的Node

ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
        // 条件队列的第一个节点
        private transient Node firstWaiter;
        // 条件队列的最后一个节点
        private transient Node lastWaiter;

回顾一下Node的属性

// 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile int waitStatus;
// 用于阻塞队列
volatile Node prev;
volatile Node next;
volatile Thread thread;
// 用于条件队列
Node nextWaiter;

先捋一下简单流程

基本上,把上面那张图看懂,你也就知道 condition 的处理流程了。所以,我先简单解释下这图,然后再具体地解释代码实现。

  1. 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的;
  2. 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;
  3. 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;
  4. 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。

上面的 2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法等,不过把这里弄懂是这节的主要目的。


条件锁的await()

await ()(实现来自AQS中的ConditionObject内部类中)

// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
// 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
public final void await() throws InterruptedException {
    // 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态,    // interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
    if (Thread.interrupted())
        throw new InterruptedException();

    // 添加到 condition 的条件队列中
    Node node = addConditionWaiter();

    // 释放锁,返回值是释放锁之前的 state 值
    // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉
    int savedState = fullyRelease(node);

    int interruptMode = 0;
    // 这里退出循环有两种情况,之后再仔细分析
    // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
    // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被唤醒后,将进入阻塞队列,等待获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter()

将节点加入到条件队列

// 将当前线程对应的节点入队,插入队尾
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列的最后一个节点取消了,将其清除出去
    // 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队?
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // node 在初始化的时候,指定 waitStatus 为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // t 此时是 lastWaiter,队尾
    // 如果队列为空
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

回到 wait 方法,节点入队了以后,会调用  int savedState = fullyRelease(node);  方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。

考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

fullyRelease (Node node)

// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
// 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
// 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
// 相应的,如果 lock 重入了 n 次,savedState == n
// 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。

下面我们再回到await() 中,从   int savedState = fullyRelease(node);  后继续:

// 如果不在阻塞队列中,注意了,是阻塞队列
while (!isOnSyncQueue(node)) {
    // 线程挂起
    LockSupport.park(this);

    // 这里可以先不用看了,等看到它什么时候被 unpark 再说
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

isOnSyncQueue(Node node)

// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
// 这个方法就是判断 node 是否已经移动到阻塞队列了
final boolean isOnSyncQueue(Node node) {

    // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
    // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
    // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果 node 已经有后继节点 next 的时候(注意是next,阻塞队列独有的,不是条件队列的),那肯定是在阻塞队列了
    if (node.next != null)
        return true;

    // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列

    // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
    // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
    // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。

    return findNodeFromTail(node);
}

// 从阻塞队列的队尾往前遍历,如果找到,返回 true
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

条件锁的signal

signal ()(实现来自AQS中的ConditionObject内部类中)

// 唤醒等待了最久的线程
// 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
public final void signal() {
    // 调用 signal 方法的线程必须持有当前的独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

doSignal(Node first)

// 从条件队列队头往后遍历,找出第一个需要转移的 node
// 因为前面我们说过,有些线程会取消排队,但是可能还在队列中
private void doSignal(Node first) {
    do {
          // 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
        // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
      // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
}

transferForSignal(Node node)

// 将节点从条件队列转移到阻塞队列
// true 代表成功转移
// false 代表在 signal 之前,节点已经取消了
final boolean transferForSignal(Node node) {

    // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
    // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
    // 否则,将 waitStatus 置为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // enq(node): 自旋进入阻塞队列的队尾
    // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
    // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节
        LockSupport.unpark(node.thread);
    return true;
}

signal 之后,回到刚刚await () 挂起的地方继续

while (!isOnSyncQueue(node)) {
    // 线程挂起
    LockSupport.park(this);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0

  1. REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
  2. THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
  3. 0 :说明在 await 期间,没有发生中断

有以下几种情况会让 LockSupport.park(this); 这句返回继续往下执行:

  1. 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
  2. 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
  3. signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
  4. 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题

线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断。

下面是await () 的中断处理部分(不了解中断的请去复习中断,不然可能会有点懵逼)

checkInterruptWhileWaiting (node)

// 1. 如果在 signal 之前已经中断,返回 THROW_IE
// 2. 如果是 signal 之后中断,返回 REINTERRUPT
// 3. 没有发生中断,返回 0
private int checkInterruptWhileWaiting(Node node) {
    // Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true,
    // 同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT) 的使用。
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

transferAfterCancelledWait(Node node)

判断是 signal 调用之前中断的,还是 signal 之后发生的中断。

// 只有线程处于中断状态,才会调用此方法
// 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
// 返回 true:如果此线程在 signal 之前被取消,
final boolean transferAfterCancelledWait(Node node) {
    // 用 CAS 将节点状态设置为 0
    // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 将节点放入阻塞队列
        // 这里我们看到,即使中断了,依然会转移到阻塞队列
        enq(node);
        return true;
    }

    // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
    // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
    // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

从刚刚await () 的 while (!isOnSyncQueue(node))  循环后的地方继续

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
    unlinkCancelledWaiters();
// 处理中断状态
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。

reportInterruptAfterWait(int interruptMode)

处理中断状态

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

带超时机制的await ()

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    // 等待这么多纳秒
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 当前时间 + 等待时长 = 过期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    // 用于返回 await 是否超时
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 时间到啦
        if (nanosTimeout <= 0L) {
            // 这里因为要 break 取消等待了。取消等待的话一定要调用 transferAfterCancelledWait(node) 这个方法
            // 如果这个方法返回 true,在这个方法内,将节点转移到阻塞队列成功
            // 返回 false 的话,说明 signal 已经发生,signal 方法将节点转移了。也就是说没有超时嘛
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // spinForTimeoutThreshold 的值是 1000 纳秒,也就是 1 毫秒
        // 也就是说,如果不到 1 毫秒了,那就不要选择 parkNanos 了,自旋的性能反而更好
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 得到剩余时间
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

超时的思路还是很简单的,不带超时参数的 await 是 park,然后等待别人唤醒。而现在就是调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时,否则就是超时了。超时的话,自己来进行转移到阻塞队列,然后抢锁。


获取共享锁

捋一下简单流程

AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。

countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。

await ()(摘自CountDownLatch)

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 这也是老套路了,中断那一节说过了
    if (Thread.interrupted())
        throw new InterruptedException();

    // state 为初始化的值。
    // 也就是说,这个 if 返回 true,然后往里看
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 只有当 state == 0 的时候,这个方法才会返回 1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly (int arg)

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 同上,只要 state 不等于 0,那么这个方法返回 -1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

经过第 1 步 addWaiter 入队以后

由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,将 head 的 waitStatus 值设置为 -1

countDown ()

public void countDown() {
    sync.releaseShared(1);
}

releaseShared (int arg)

public final boolean releaseShared(int arg) {
    // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
    // 否则只是简单的 state = state - 1 那么 countDown() 方法就结束了
    // 将 state 减到 0 的那个操作才是最复杂的,继续往下吧
    if (tryReleaseShared(arg)) {
        // 唤醒 await 的线程
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()

// 调用这个方法的时候,state == 0
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                // 将 head 的 waitStatue 设置为 0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

之后被唤醒的线程会回到await() 的阻塞的地方继续

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 2. 这里是下一步
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 1. 唤醒后这个方法返回
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

接下来,线程会进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
    // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
            doReleaseShared();
    }
}

就这样,醒来的线程唤醒下一个线程,下一个线程唤醒下下个线程

如果你能看到这里,那么恭喜你,我要送你一句话:



AQS源码解析

原文地址:https://www.cnblogs.com/fatmanhappycode/p/12269340.html

时间: 2024-10-07 19:01:06

AQS源码解析的相关文章

AbstractQueuedSynchronizer(AQS)源码解析(一)

在JDK1.5版本,新增了并发包,其中包含了显示锁.并发容器.在这些锁和容器里,都有同步器(AQS)的身影.为了更好地理解JDK的并发包,我会用三个主题来详细描述AbstractQueuedSynchronizer的实现. 在AQS中,涉及到同步队列以及Condition对象,这也是我为什么要用三个主题来讲述的原因.本节将主要讲述同步队列,后面两节会分别讲述Condition对象以及AQS的主要功能实现. AQS同步队列的主要功能是将无法获得资源的线程放入同步队列中,进行等待,它是通过链表来

AQS源码解析(一)-AtomicBoolean源码解析

基本类: AtomicInteger AtomicLong AtomicBoolean 数组类型: AtomicIntegerArray AtomicLongArray AtomicReferenceArray 介绍 由于在多线程条件下,如果对共享变量修改容易造成数据不一致的情况,所以对于共享变量需要保证线程安全有有如下几种方式: 使用lock或者synchronized进行同步共享变量 使用CAS方法来保证修改变量为原子性操作 该类为后者,基于CAS方式修改具有原子性. 实现原理 将boole

Java 并发之AbstractQueuedSynchronizer(AQS)源码解析

关键字:CLH,Node,线程,waitStatus,CAS,中断 目录 图解AQS的操作细节 0.前言 1.基本概念 1.1.CAS自旋 1.2.Node 1.3.CLH & AQS 1.4.ReentrantLock 2.图解AQS 2.1.线程A单独运行 2.2.线程B开始运行 2.3.线程C开始运行 2.4.线程A停止运行,线程B继续运行 2.5.1.线程B停止运行,线程C继续运行 2.5.2.线程C放弃竞争 3.问题总结 3.1.为什么在unparkSuccessor操作中从尾节点开始

AQS源码解析(1)-CLH

目录 AQS解析 一.简介 二.同步的状态和基本属性 三.入队 addWaiter 3.1 基本步骤介绍 3.2 addWaiter() 3.3 enq(Node node) 四.出队 参考 AQS解析 一.简介 Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO)

源码解析之AQS源码解析

要理解Lock首先要理解AQS,而要理解并发类最好的方法是先理解其并发控制量不同值得含义以及该类运作流程,然后配合一步步看源码.该类有一个重要的控制量是WaitStates. /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; //该节点被取消了 /** waitStatus value to indicate successor's thread needs unpar

AbstractQueuedSynchronizer源码解析

1.简介 AbstractQueuedSynchronizer队列同步器,用来实现锁或者其他同步组件的基础框架 AbstractQueuedSynchronizer使用int类型的volatile变量维护同步状态 一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态,同时,AQS提供以下线程安全的方法来对状态进行操作 protected final int getState() { retu

FutureTask 源码解析

FutureTask 源码解析 版权声明:本文为本作者原创文章,转载请注明出处.感谢 码梦为生| 刘锟洋 的投稿 站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:1. 线程执行结果带有返回值2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回. 那,怎么实现future这种超时控制呢?来看看代码: FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec

CountDownLatch源码解析

一.CountDownLatch介绍 CountDownLatch是在jdk1.5被引入的,它主要是通过一个计数器来实现的,当在初始化该类的构造函数时,会事先传入一个状态值,之后在执行await方法后, 在这个状态值为0之前,当前线程(指的是调用await的线程)会一直等待.它内部使用了AQS来实现的,且是共享锁,具体怎么实现,待会看看它的实现原理. 它的应用场景: 一般在于在执行当前线程之前,要完成n个线程的任务,才能执行当前线程.这种场景适合用countdownLatch. 二.源码解析 先