并发编程(七)——AbstractQueuedSynchronizer 之 CountDownLatch、CyclicBarrier、Semaphore 源码分析

这篇,我们的关注点是 AQS 最后的部分,共享模式的使用。本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier、Semaphore 的源码一起过一下。

CountDownLatch

CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类。使用方法在前面一篇文章中有介绍 并发编程(二)—— CountDownLatch、CyclicBarrier和Semaphore

使用例子

我们看下 Doug Lea 在 java doc 中给出的例子,这个例子非常实用,我们经常会写这个代码。

假设我们有 N ( N > 0 ) 个任务,那么我们会用 N 来初始化一个 CountDownLatch,然后将这个 latch 的引用传递到各个线程中,在每个线程完成了任务后,调用 latch.countDown() 代表完成了一个任务。

调用 latch.await() 的方法的线程会阻塞,直到所有的任务完成。

class Driver2 { // ...
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = Executors.newFixedThreadPool(8);

        // 创建 N 个任务,提交给线程池来执行
        for (int i = 0; i < N; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        // 等待所有的任务完成,这个方法才会返回
        doneSignal.await();           // wait for all to finish
    }
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;

    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }

    public void run() {
        try {
            doWork(i);
            // 这个线程的任务完成了,调用 countDown 方法
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

所以说 CountDownLatch 非常实用,我们常常会将一个比较大的任务进行拆分,然后开启多个线程来执行,等所有线程都执行完了以后,再往下执行其他操作。这里例子中,只有 main 线程调用了 await 方法。

我们再来看另一个例子,这个例子很典型,用了两个 CountDownLatch:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        // 这边插入一些代码,确保上面的每个线程先启动起来,才执行下面的代码。
        doSomethingElse();            // don‘t let run yet
        // 因为这里 N == 1,所以,只要调用一次,那么所有的 await 方法都可以通过
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        // 等待所有任务结束
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            // 为了让所有线程同时开始任务,我们让所有线程先阻塞在这里
            // 等大家都准备好了,再打开这个门栓
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

这个例子中,doneSignal 同第一个例子的使用,我们说说这里的 startSignal。N 个新开启的线程都调用了startSignal.await() 进行阻塞等待,它们阻塞在栅栏上,只有当条件满足的时候(startSignal.countDown()),它们才能同时通过这个栅栏。

如果始终只有一个线程调用 await 方法等待任务完成,那么 CountDownLatch 就会简单很多,所以之后的源码分析读者一定要在脑海中构建出这么一个场景:有 m 个线程是做任务的,有 n 个线程在某个栅栏上等待这 m 个线程做完任务,直到所有 m 个任务完成后,n 个线程同时通过栅栏。

源码分析

构造方法,需要传入一个不小于 0 的整数:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
// 老套路了,内部封装一个 Sync 类继承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
        // 这样就 state == count 了
        setState(count);
    }
    ...
}

先分析套路:AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程调用会做 state = state - 1 操作,当 state 减到 0 的同时,那个线程会负责唤醒调用了 await 方法的所有线程。

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。

我们用以下程序来分析源码,t1 和 t2 负责调用 countDown() 方法,t3 和 t4 调用 await 方法阻塞:

 1 public class CountDownLatchDemo {
 2
 3     public static void main(String[] args) {
 4
 5         CountDownLatch latch = new CountDownLatch(2);
 6
 7         Thread t1 = new Thread(new Runnable() {
 8             @Override
 9             public void run() {
10                 try {
11                     Thread.sleep(5000);
12                 } catch (InterruptedException ignore) {
13                 }
14                 // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown()
15                 latch.countDown();
16             }
17         }, "t1");
18
19         Thread t2 = new Thread(new Runnable() {
20             @Override
21             public void run() {
22                 try {
23                     Thread.sleep(10000);
24                 } catch (InterruptedException ignore) {
25                 }
26                 // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown()
27                 latch.countDown();
28             }
29         }, "t2");
30
31         t1.start();
32         t2.start();
33
34         Thread t3 = new Thread(new Runnable() {
35             @Override
36             public void run() {
37                 try {
38                     // 阻塞,等待 state 减为 0
39                     latch.await();
40                     System.out.println("线程 t3 从 await 中返回了");
41                 } catch (InterruptedException e) {
42                     System.out.println("线程 t3 await 被中断");
43                     Thread.currentThread().interrupt();
44                 }
45             }
46         }, "t3");
47         Thread t4 = new Thread(new Runnable() {
48             @Override
49             public void run() {
50                 try {
51                     // 阻塞,等待 state 减为 0
52                     latch.await();
53                     System.out.println("线程 t4 从 await 中返回了");
54                 } catch (InterruptedException e) {
55                     System.out.println("线程 t4 await 被中断");
56                     Thread.currentThread().interrupt();
57                 }
58             }
59         }, "t4");
60
61         t3.start();
62         t4.start();
63     }
64 }

上述程序,大概在过了 10 秒左右的时候,会输出:

线程 t3 从 await 中返回了
线程 t4 从 await 中返回了
// 这两条输出,顺序不是绝对的
// 后面的分析,我们假设 t3 先进入阻塞队列

接下来,我们按照流程一步一步走:先 await 等待,然后被唤醒,await 方法返回。

首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 这也是老套路了,我在第二篇的中断那一节说过了
    if (Thread.interrupted())
        throw new InterruptedException();
    // t3 和 t4 调用 await 的时候,state 都大于 0。
    // 也就是说,这个 if 返回 true,然后往里看
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 只有当 state == 0 的时候,这个方法才会返回 1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。

 1 private void doAcquireSharedInterruptibly(int arg)
 2     throws InterruptedException {
 3     // 1. 入队
 4     final Node node = addWaiter(Node.SHARED);
 5     boolean failed = true;
 6     try {
 7         for (;;) {
 8             final Node p = node.predecessor();
 9             if (p == head) {
10                 // 同上,只要 state 不等于 0,那么这个方法返回 -1
11                 int r = tryAcquireShared(arg);
12                 // r=-1时,这里if不会进入
13                 if (r >= 0) {
14                     setHeadAndPropagate(node, r);
15                     p.next = null; // help GC
16                     failed = false;
17                     return;
18                 }
19             }
20             // 2. 这和第一篇AQS里面代码一样,修改前驱节点的waitStatus 为-1,同时挂起当前线程
21             if (shouldParkAfterFailedAcquire(p, node) &&
22                 parkAndCheckInterrupt())
23                 throw new InterruptedException();
24         }
25     } finally {
26         if (failed)
27             cancelAcquire(node);
28     }
29 }

我们来仔细分析这个方法,线程 t3 经过第 1 步 第4行 addWaiter 入队以后,我们应该可以得到这个:

由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:

然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。

我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:

然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。

接下来,我们来看唤醒的流程,我们假设用 10 初始化 CountDownLatch。

当然,我们的例子中,其实没有 10 个线程,只有 2 个线程 t1 和 t2,只是为了让图好看些罢了。

我们再一步步看具体的流程。首先,我们看 countDown() 方法:

 1 public void countDown() {
 2     sync.releaseShared(1);
 3 }
 4 public final boolean releaseShared(int arg) {
 5     // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
 6     // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了
 7     if (tryReleaseShared(arg)) {
 8         // 唤醒 await 的线程
 9         doReleaseShared();
10         return true;
11     }
12     return false;
13 }
14 // 这个方法很简单,用自旋的方法实现 state 减 1
15 protected boolean tryReleaseShared(int releases) {
16     for (;;) {
17         int c = getState();
18         if (c == 0)
19             return false;
20         int nextc = c-1;
21         //通过CAS将state的值减1,失败就不会进入return,继续for循环,直至CAS成功
22         if (compareAndSetState(c, nextc))
23             //state减到0就返回true,否则返回false
24             return nextc == 0;
25     }
26 }

countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:

 1 // 调用这个方法的时候,state == 0
 2 private void doReleaseShared() {
 3     for (;;) {
 4         Node h = head;
 5         if (h != null && h != tail) {
 6             int ws = h.waitStatus;
 7             // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
 8             if (ws == Node.SIGNAL) {
 9                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
10                     continue;            // loop to recheck cases
11                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
12                 // 在这里,也就是唤醒 t3 , t3的await()方法可以接着运行了
13                 unparkSuccessor(h);
14             }
15             else if (ws == 0 &&
16                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
17                 continue;                // loop on failed CAS
18         }
19         //此时 h == head 说明被唤醒的 t3线程 还没有执行到await()方法中的setHeadAndPropagate(node, r)这一步,则此时循环结束;
20         //如果执行完setHeadAndPropagate(node, r),则head就为t3了,这里的h和head就不相等,会继续循环
21         if (h == head)                   // loop if head changed
22             break;
23     }
24 }

一旦 t3 被唤醒后,我们继续回到 await 的这段代码,在第24行代码 parkAndCheckInterrupt 返回继续接着运行,我们先不考虑中断的情况:

 1 private void doAcquireSharedInterruptibly(int arg)
 2     throws InterruptedException {
 3     final Node node = addWaiter(Node.SHARED);
 4     boolean failed = true;
 5     try {
 6         for (;;) {
 7             //p表示当前节点的前驱节点
 8             final Node p = node.predecessor();
 9             //此时被唤醒的是之前head的后继节点,所以此线程的前驱节点是head
10             if (p == head) {
11                 //此时state已经为0,r为1
12                 int r = tryAcquireShared(arg);
13                 if (r >= 0) {
14                     // 2. 这里将唤醒t3的后续节点t4,以此类推,t4被唤醒后,会在t4的await中唤醒t4的后续节点
15                     setHeadAndPropagate(node, r);
16                     // 将已经唤醒的t3节点从队列中去除
17                     p.next = null; // help GC
18                     failed = false;
19                     return;
20                 }
21             }
22             if (shouldParkAfterFailedAcquire(p, node) &&
23                 // 1. 唤醒后这个方法返回
24                 parkAndCheckInterrupt())
25                 throw new InterruptedException();
26         }
27     } finally {
28         if (failed)
29             cancelAcquire(node);
30     }
31 }

接下来,t3 会循环一次进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:

 1 private void setHeadAndPropagate(Node node, int propagate) {
 2     Node h = head; // Record old head for check below
 3     setHead(node);
 4
 5     // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
 6     // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
 7     if (propagate > 0 || h == null || h.waitStatus < 0 ||
 8         (h = head) == null || h.waitStatus < 0) {
 9         Node s = node.next;
10         if (s == null || s.isShared())
11             // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
12             doReleaseShared();
13     }
14 }

又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了:

 1 // 调用这个方法的时候,state == 0
 2 private void doReleaseShared() {
 3     for (;;) {
 4         Node h = head;
 5         if (h != null && h != tail) {
 6             int ws = h.waitStatus;
 7             // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
 8             if (ws == Node.SIGNAL) {
 9                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
10                     continue;            // loop to recheck cases
11                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
12                 // 在这里,也就是唤醒 t4
13                 unparkSuccessor(h);
14             }
15             else if (ws == 0 &&
16                      // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
17                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
18                 continue;                // loop on failed CAS
19         }
20         // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
21         // 否则,就是 head 没变,那么退出循环,
22         // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会在await()方法中调用此方法接着唤醒后续节点
23         if (h == head)                   // loop if head changed
24             break;
25     }
26 }

总结

总的来说,CountDownLatch 就是线程入队阻塞,依次唤醒的过程

使用过程会执行以下操作:

  1.当创建一个CountDownLatch 的实例后,AQS中的state会设置一个正整数

  2.一个线程调用await(),当前线程加入到阻塞队列中,当前线程挂起

  3.一个线程调用countDown()唤醒方法,state减1,直到state被减为0时,唤醒阻塞队列中第一个等待节点中的线程

  4.第一个线程被唤醒后,当前线程继续执行await()方法,将当前线程设置为head,并在此方法中唤醒head的下一个节点,依次类推

CyclicBarrier

字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

首先,CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。

因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。

废话结束,先上基本属性和构造方法,往下拉一点点,和图一起看:

 1 public class CyclicBarrier {
 2     // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代"
 3     private static class Generation {
 4         boolean broken = false;
 5     }
 6
 7     /** The lock for guarding barrier entry */
 8     private final ReentrantLock lock = new ReentrantLock();
 9     // CyclicBarrier 是基于 Condition 的
10     // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
11     private final Condition trip = lock.newCondition();
12
13     // 参与的线程数
14     private final int parties;
15
16     // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
17     private final Runnable barrierCommand;
18
19     // 当前所处的“代”
20     private Generation generation = new Generation();
21
22     // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
23     // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
24     private int count;
25
26     public CyclicBarrier(int parties, Runnable barrierAction) {
27         if (parties <= 0) throw new IllegalArgumentException();
28         this.parties = parties;
29         this.count = parties;
30         this.barrierCommand = barrierAction;
31     }
32
33     public CyclicBarrier(int parties) {
34         this(parties, null);
35     }

我用一图来描绘下 CyclicBarrier 里面的一些概念:

现在开始分析最重要的等待通过栅栏方法 await 方法:

 1 // 不带超时机制
 2 public int await() throws InterruptedException, BrokenBarrierException {
 3     try {
 4         return dowait(false, 0L);
 5     } catch (TimeoutException toe) {
 6         throw new Error(toe); // cannot happen
 7     }
 8 }
 9 // 带超时机制,如果超时抛出 TimeoutException 异常
10 public int await(long timeout, TimeUnit unit)
11     throws InterruptedException,
12            BrokenBarrierException,
13            TimeoutException {
14     return dowait(true, unit.toNanos(timeout));
15 }

继续往里看:

 1 private int dowait(boolean timed, long nanos)
 2         throws InterruptedException, BrokenBarrierException,
 3                TimeoutException {
 4     final ReentrantLock lock = this.lock;
 5     // 先要获取到锁,然后在 finally 中要记得释放锁
 6     // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁
 7     lock.lock();
 8     try {
 9         final Generation g = generation;
10         // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
11         if (g.broken)
12             throw new BrokenBarrierException();
13         // 检查中断状态,如果中断了,抛出 InterruptedException 异常
14         if (Thread.interrupted()) {
15             breakBarrier();
16             throw new InterruptedException();
17         }
18         // index 是这个 await 方法的返回值
19         // 注意到这里,这个是从 count 递减后得到的值
20         int index = --count;
21
22         //最后一个线程到达后, 唤醒所有等待的线程,开启新的一代(设置新的generation)
23         // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
24         if (index == 0) {  // tripped
25             boolean ranAction = false;
26             try {
27                 // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
28                 final Runnable command = barrierCommand;
29                 if (command != null)
30                     command.run();
31                 // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
32                 ranAction = true;
33                 // 唤醒等待的线程,然后开启新的一代
34                 nextGeneration();
35                 return 0;
36             } finally {
37                 if (!ranAction)
38                     // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
39                     // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
40                     breakBarrier();
41             }
42         }
43
44         // loop until tripped, broken, interrupted, or timed out
45         // 如果是最后一个线程调用 await,那么上面就返回了
46         // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
47         for (;;) {
48             try {
49                 // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
50                 if (!timed)
51                     //此线程会添加到Condition条件队列中,并在此阻塞
52                     trip.await();
53                 else if (nanos > 0L)
54                     nanos = trip.awaitNanos(nanos);
55             } catch (InterruptedException ie) {
56                 // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
57                 if (g == generation && ! g.broken) {
58                     // 打破栅栏
59                     breakBarrier();
60                     // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
61                     throw ie;
62                 } else {
63                     Thread.currentThread().interrupt();
64                 }
65             }
66
67               // 唤醒后,检查栅栏是否是“破的”
68             if (g.broken)
69                 throw new BrokenBarrierException();
70
71             // 上面最后一个线程执行nextGeneration()后,generation被重写设置
72             // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
73             // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的,因为最后一个到达的线程已经重写设置了generation
74             if (g != generation)
75                 return index;
76
77             // 如果醒来发现超时了,打破栅栏,抛出异常
78             if (timed && nanos <= 0L) {
79                 breakBarrier();
80                 throw new TimeoutException();
81             }
82         }
83     } finally {
84         lock.unlock();
85     }
86 }

我们看看怎么开启新的一代:

1 // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代”
2 private void nextGeneration() {
3     // 首先,需要唤醒所有的在栅栏上等待的线程
4     trip.signalAll();
5     // 更新 count 的值
6     count = parties;
7     // 重新生成“新一代”
8     generation = new Generation();
9 }

看看怎么打破一个栅栏:

1 private void breakBarrier() {
2     // 设置状态 broken 为 true
3     generation.broken = true;
4     // 重置 count 为初始值 parties
5     count = parties;
6     // 唤醒所有已经在等待的线程
7     trip.signalAll();
8 }

整个过程已经很清楚了。

下面我们来看看怎么得到有多少个线程到了栅栏上,处于等待状态:

1 public int getNumberWaiting() {
2     final ReentrantLock lock = this.lock;
3     lock.lock();
4     try {
5         return parties - count;
6     } finally {
7         lock.unlock();
8     }
9 }

判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:

1 public boolean isBroken() {
2     final ReentrantLock lock = this.lock;
3     lock.lock();
4     try {
5         return generation.broken;
6     } finally {
7         lock.unlock();
8     }
9 }

最后,我们来看看怎么重置一个栅栏:

 1 public void reset() {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         breakBarrier();   // break the current generation
 6         nextGeneration(); // start a new generation
 7     } finally {
 8         lock.unlock();
 9     }
10 }

Semaphore

有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。

套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

构造方法:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

这里和 ReentrantLock 类似,用了公平策略和非公平策略。

看 acquire 方法:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。

我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:

 1 // 公平策略:
 2 protected int tryAcquireShared(int acquires) {
 3     for (;;) {
 4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
 5         // 这个就不分析了,第一篇AQS中已经讲过
 6         if (hasQueuedPredecessors())
 7             //进入到这里说明阻塞队列中已经有线程在等着获取资源
 8             return -1;
 9         int available = getState();
10         int remaining = available - acquires;
11         //当remaining最小为0时,会CAS设置state为0,成功返回remaining
12         //当remaining小于0时,这里会直接返回remaining,这里不会执行compareAndSetState
13         if (remaining < 0 ||
14             compareAndSetState(available, remaining))
15             return remaining;
16     }
17 }
18 // 非公平策略:
19 protected int tryAcquireShared(int acquires) {
20     return nonfairTryAcquireShared(acquires);
21 }
22 final int nonfairTryAcquireShared(int acquires) {
23     for (;;) {
24         int available = getState();
25         int remaining = available - acquires;
26         if (remaining < 0 ||
27             compareAndSetState(available, remaining))
28             return remaining;
29     }
30 }

我们再回到 acquireShared 方法

1 public final void acquireShared(int arg) {
2     if (tryAcquireShared(arg) < 0)
3         doAcquireShared(arg);
4 }

当 tryAcquireShared(arg)大于或者等于0时,获取资源成功,接着执行acquire()后面的业务代码;

当 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,即执行上面第3行代码

 1 private void doAcquireShared(int arg) {
 2     final Node node = addWaiter(Node.SHARED);
 3     boolean failed = true;
 4     try {
 5         boolean interrupted = false;
 6         for (;;) {
 7             final Node p = node.predecessor();
 8             if (p == head) {
 9                 int r = tryAcquireShared(arg);
10                 if (r >= 0) {
11                     setHeadAndPropagate(node, r);
12                     p.next = null; // help GC
13                     if (interrupted)
14                         selfInterrupt();
15                     failed = false;
16                     return;
17                 }
18             }
19             if (shouldParkAfterFailedAcquire(p, node) &&
20                 parkAndCheckInterrupt())
21                 interrupted = true;
22         }
23     } finally {
24         if (failed)
25             cancelAcquire(node);
26     }
27 }

这个方法我就不介绍了,前面有很多地方介绍过这个方法,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:

 1 // 任务介绍,释放一个资源
 2 public void release() {
 3     sync.releaseShared(1);
 4 }
 5 public final boolean releaseShared(int arg) {
 6     if (tryReleaseShared(arg)) {
 7         doReleaseShared();
 8         return true;
 9     }
10     return false;
11 }
12
13 protected final boolean tryReleaseShared(int releases) {
14     for (;;) {
15         int current = getState();
16         int next = current + releases;
17         // 溢出,当然,我们一般也不会用这么大的数
18         if (next < current) // overflow
19             throw new Error("Maximum permit count exceeded");
20     //释放资源后,将state的值又加上释放资源数
21         if (compareAndSetState(current, next))
22             return true;
23     }
24 }

tryReleaseShared 方法总是会返回 true,此时state的资源数已经加上了,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程中的第一个等待的线程:

 1 private void doReleaseShared() {
 2     for (;;) {
 3         Node h = head;
 4         if (h != null && h != tail) {
 5             int ws = h.waitStatus;
 6             if (ws == Node.SIGNAL) {
 7                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 8                     continue;            // loop to recheck cases
 9                 unparkSuccessor(h);
10             }
11             else if (ws == 0 &&
12                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13                 continue;                // loop on failed CAS
14         }
15         if (h == head)                   // loop if head changed
16             break;
17     }
18 }

第一个等待的线程被唤醒后,doReleaseShared终止,接着doAcquireShared()方法被唤醒接着运行,如果资源还够用,则唏嘘唤醒下一个等待节点,可以看到doAcquireShared()方法中第11行处 设置当前节点为head节点,并唤醒下一个等待节点

Semphore 的源码确实很简单,方法都和CountDownLatch 中差不多,基本上都是分析过的老代码的组合使用了。

原文地址:https://www.cnblogs.com/java-chen-hao/p/10191106.html

时间: 2024-10-27 01:05:47

并发编程(七)——AbstractQueuedSynchronizer 之 CountDownLatch、CyclicBarrier、Semaphore 源码分析的相关文章

并发编程(四):ThreadLocal从源码分析总结到内存泄漏

一.目录 1.ThreadLocal是什么?有什么用? 2.ThreadLocal源码简要总结? 3.ThreadLocal为什么会导致内存泄漏? 二.ThreadLocal是什么?有什么用? 引入话题:在并发条件下,如何正确获得共享数据?举例:假设有多个用户需要获取用户信息,一个线程对应一个用户.在mybatis中,session用于操作数据库,那么设置.获取操作分别是session.set().session.get(),如何保证每个线程都能正确操作达到想要的结果? /** * 回顾sync

第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析

王家林亲授<DT大数据梦工厂>大数据实战视频“Scala深入浅出实战经典”视频.音频和PPT下载!第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析百度云:http://pan.baidu.com/s/1pJ5jzHx腾讯微云:http://url.cn/aSawrm360云盘:http://yunpan.cn/cctL3QYACaVNa  访问密码 c0fb 信息来源于 DT大数据梦工厂微信公众账号:DT_Spark

Java - &quot;JUC&quot; Semaphore源码分析

Java多线程系列--"JUC锁"11之 Semaphore信号量的原理和示例 Semaphore简介 Semaphore是一个计数信号量,它的本质是一个"共享锁". 信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可. Java并发提供了两种加锁模式:共享锁和独占锁.前面LZ介绍的Reent

【Java并发编程】21、线程池ThreadPoolExecutor源码解析

一.前言 JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.下面开始分析. 二.ThreadPoolExecutor数据结构 在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQu

Java并发编程与技术内幕:CopyOnWriteArrayList、CopyOnWriteArraySet源码解析

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文主要讲了Java中CopyOnWriteArrayList .CopyOnWriteArraySet的源码分析 一.CopyOnWriteArrayList源码分析 CopyOnWriteArrayList在java的并发场景中用得其实并不是非常多,因为它并不能完全保证读取数据的正确性.其主要有以下的一些特点:1.适合场景读多写少2.不能保证读取数据一定是正确 的,因为get时是不

java并发编程的艺术(三)---lock源码

jdk1.5以后,并发包中新增了lock接口, 它相对于synchronized,多了以下三个主要特性:尝试非阻塞地获取锁(尝试获取锁成功则持有).能被中断地获取锁(锁的进程能响应中断).超时获取锁(指定时间截止之前获取锁). 我们看看它接口中定义的api: 获取锁 可中断地获取锁 尝试非阻塞地获取锁,能够获取则返回true,否则false 超时获取锁,三种返回情况:1.当前线程在超时时间内获得了锁.2.当前线程在超时时间内被中断.3.超时时间内没获得锁 释放锁 获取等待通知组件,该组件和当前的

Semaphore源码分析

源码解析 Semaphore有两种模式,公平模式和非公平模式.公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO:而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程. 构造方法 Semaphore有两个构造方法,如下:        public Semaphore(int permits) {         sync = new NonfairSync(permits);     }    public Sem

Scala并发编程实战初体验及其在Spark源码中的应用解析之Scala学习笔记-56

package com.leegh.actor import scala.actors.Actor /** * @author Guohui Li */object First_Actor extends Actor { def act() { for (i <- 1 to 10) { println("Step : " + i) println(Thread.currentThread().getName) Thread.sleep(2000) } }} object Seco

Spark进阶视频之Scala并发编程实战初体验及其在Spark源码中的应用解析

王家林亲授<DT大数据梦工厂>大数据实战视频"Scala深入浅出实战经典"视频.音频和PPT下载! 欢迎广大Spark爱好者学习交流.也欢迎广大学习爱好者加入DT大数据梦工厂交流群:462923555DT大数据微信公众账号:DT_Spark 视频观看链接http://www.tudou.com/plcover/Yy5F5gsurSE/ 视频下载地址百度云:http://pan.baidu.com/s/1eQGqzEa腾讯微云: http://url.cn/SshT6b