java 并发(五)---AbstractQueuedSynchronizer(3)

       文章代码分析和部分图片来自参考文章

认识 CountDownLatch

分析这个类,首先了解一下它所可以实现的效果,然后再顺着这个源码的思路思考是不是和它实现的效果一样。下面的代码和图片可以说明 CountDownLatch (下文简称CDL)的工作过程。

  1 public class CountDownLatchDemo {
  2
  3     public static void main(String[] args) {
  4
  5         CountDownLatch latch = new CountDownLatch(2);
  6
  7         Thread t1 = new Thread(new Runnable() {
  8             @Override
  9             public void run() {
 10                 try {
 11                     Thread.sleep(5000);
 12                 } catch (InterruptedException ignore) {
 13                 }
 14                 // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown()
 15                 latch.countDown();
 16             }
 17         }, "t1");
 18
 19         Thread t2 = new Thread(new Runnable() {
 20             @Override
 21             public void run() {
 22                 try {
 23                     Thread.sleep(10000);
 24                 } catch (InterruptedException ignore) {
 25                 }
 26                 // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown()
 27                 latch.countDown();
 28             }
 29         }, "t2");
 30
 31         t1.start();
 32         t2.start();
 33
 34         Thread t3 = new Thread(new Runnable() {
 35             @Override
 36             public void run() {
 37                 try {
 38                     // 阻塞,等待 state 减为 0
 39                     latch.await();
 40                     System.out.println("线程 t3 从 await 中返回了");
 41                 } catch (InterruptedException e) {
 42                     System.out.println("线程 t3 await 被中断");
 43                     Thread.currentThread().interrupt();
 44                 }
 45             }
 46         }, "t3");
 47         Thread t4 = new Thread(new Runnable() {
 48             @Override
 49             public void run() {
 50                 try {
 51                     // 阻塞,等待 state 减为 0
 52                     latch.await();
 53                     System.out.println("线程 t4 从 await 中返回了");
 54                 } catch (InterruptedException e) {
 55                     System.out.println("线程 t4 await 被中断");
 56                     Thread.currentThread().interrupt();
 57                 }
 58             }
 59         }, "t4");
 60
 61         t3.start();
 62         t4.start();
 63     }
 64 }

我们知道这里面实际需要分析的方法就是 await 和 countDown 方法。

源码分析 CountDownLatch

从方法中我们就可以知道CDL中的实现AQS的共享模式获取锁。我们以上面的Demo 来做源码分析

         下文源码分析来自   一行一行源码分析清楚 AbstractQueuedSynchronizer (三)

  1 public void await() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public final void acquireSharedInterruptibly(int arg)
  5         throws InterruptedException {
  6     // 这也是老套路了,我在第二篇的中断那一节说过了
  7     if (Thread.interrupted())
  8         throw new InterruptedException();
  9     // t3 和 t4 调用 await 的时候,state 都大于 0。
 10     // 也就是说,这个 if 返回 true,然后往里看
 11     if (tryAcquireShared(arg) < 0)
 12         doAcquireSharedInterruptibly(arg);
 13 }
 14 // 只有当 state == 0 的时候,这个方法才会返回 1
 15 protected int tryAcquireShared(int acquires) {
 16     return (getState() == 0) ? 1 : -1;
 17 }
  1 private void doAcquireSharedInterruptibly(int arg)
  2     throws InterruptedException {
  3     // 1. 入队
  4     final Node node = addWaiter(Node.SHARED);
  5     boolean failed = true;
  6     try {
  7         for (;;) {
  8             final Node p = node.predecessor();
  9             if (p == head) {
 10                 // 同上,只要 state 不等于 0,那么这个方法返回 -1
 11                 int r = tryAcquireShared(arg);
 12                 if (r >= 0) {
 13                     setHeadAndPropagate(node, r);
 14                     p.next = null; // help GC
 15                     failed = false;
 16                     return;
 17                 }
 18             }
 19             // 2
 20             if (shouldParkAfterFailedAcquire(p, node) &&
 21                 parkAndCheckInterrupt())
 22                 throw new InterruptedException();
 23         }
 24     } finally {
 25         if (failed)
 26             cancelAcquire(node);
 27     }
 28 }

图一是两个线程假如到阻塞队列中的情景,图二是CountDown 方法执行的过程。

图一

图二(例子中数量不为10,此处仅为说明执行过程)

  1 public void countDown() {
  2     sync.releaseShared(1);
  3 }
  4 public final boolean releaseShared(int arg) {
  5     // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
  6     // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了
  7     if (tryReleaseShared(arg)) {
  8         // 唤醒 await 的线程
  9         doReleaseShared();
 10         return true;
 11     }
 12     return false;
 13 }
 14 // 这个方法很简单,用自旋的方法实现 state 减 1
 15 protected boolean tryReleaseShared(int releases) {
 16     for (;;) {
 17         int c = getState();
 18         if (c == 0)
 19             return false;
 20         int nextc = c-1;
 21         if (compareAndSetState(c, nextc))
 22             return nextc == 0;
 23     }
 24 }
 25 countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
 26
 27 // 调用这个方法的时候,state == 0
 28 // 这个方法先不要看所有的代码,按照思路往下到我写注释的地方,其他的之后还会仔细分析
 29 private void doReleaseShared() {
 30     for (;;) {
 31         Node h = head;
 32         if (h != null && h != tail) {
 33             int ws = h.waitStatus;
 34             // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
 35             if (ws == Node.SIGNAL) {
 36                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 37                     continue;            // loop to recheck cases
 38                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
 39                 // 在这里,也就是唤醒 t3
 40                 unparkSuccessor(h);
 41             }
 42             else if (ws == 0 &&
 43                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
 44                 continue;                // loop on failed CAS
 45         }
 46         if (h == head)                   // loop if head changed
 47             break;
 48     }
 49 }

接下来就是唤醒了哦!

  1 private void doAcquireSharedInterruptibly(int arg)
  2     throws InterruptedException {
  3     final Node node = addWaiter(Node.SHARED);
  4     boolean failed = true;
  5     try {
  6         for (;;) {
  7             final Node p = node.predecessor();
  8             if (p == head) {
  9                 int r = tryAcquireShared(arg);
 10                 if (r >= 0) {
 11                     setHeadAndPropagate(node, r); // 2. 这里是下一步
 12                     p.next = null; // help GC
 13                     failed = false;
 14                     return;
 15                 }
 16             }
 17             if (shouldParkAfterFailedAcquire(p, node) &&
 18                 // 1. 唤醒后这个方法返回
 19                 parkAndCheckInterrupt())
 20                 throw new InterruptedException();
 21         }
 22     } finally {
 23         if (failed)
 24             cancelAcquire(node);
 25     }
 26 }
  1 private void setHeadAndPropagate(Node node, int propagate) {
  2     Node h = head; // Record old head for check below
  3     setHead(node);
  4
  5     // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
  6     // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
  7     if (propagate > 0 || h == null || h.waitStatus < 0 ||
  8         (h = head) == null || h.waitStatus < 0) {
  9         Node s = node.next;
 10         if (s == null || s.isShared())
 11             // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
 12             doReleaseShared();
 13     }
 14 }
  1 // 调用这个方法的时候,state == 0
  2 private void doReleaseShared() {
  3     for (;;) {
  4         Node h = head;
  5         // 1. h == null: 说明阻塞队列为空
  6         // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,
  7         //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了
  8         // 所以这两种情况不需要进行唤醒后继节点
  9         if (h != null && h != tail) {
 10             int ws = h.waitStatus;
 11             // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
 12             if (ws == Node.SIGNAL) {
 13                 // 这里 CAS 失败的场景请看下面的解读
 14                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 15                     continue;            // loop to recheck cases
 16                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
 17                 // 在这里,也就是唤醒 t4
 18                 unparkSuccessor(h);
 19             }
 20             else if (ws == 0 &&
 21                      // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
 22                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 23                 continue;                // loop on failed CAS
 24         }
 25         // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
 26         // 否则,就是 head 没变,那么退出循环,
 27         // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的
 28         if (h == head)                   // loop if head changed
 29             break;
 30     }
 31 }

这里方法要注意了哦,t3 作为唤醒者,走到了 doReleaseShared 执行唤醒的操作,t4作为被唤醒者,也会走到 doReleaseShared ,这个方法,OK ,但是 t3的那个线程继续走到 28行的时候,有可能由于此时head 是被T4修改了(看唤醒后的流程),所以 h==head 返回 false ,那么循环就会继续,继续的话,就有可能两线程在 14行相遇,导致了 CAS 失败。失败了没关系,需要知道的是,总会有一个获得了锁,然后接着唤醒后面的线程,即是说会有一定几率发生多个线程唤醒后面锁的情况,但是为什么要这样做呢?个人理解 :

  • 方法复用
  • 提升吞吐量

CyclicBarrier

CyclicBarrier 基于 Condition 来实现,CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。CyclicBarrier 是基于 AQS 的 ConditionObject 和 ReentranLock 实现的。

下面代码分析来自参考文章,建议可以去阅读一下。

  1 public class CyclicBarrier {
  2     // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代"
  3     private static class Generation {
  4         boolean broken = false;
  5     }
  6
  7     /** The lock for guarding barrier entry */
  8     private final ReentrantLock lock = new ReentrantLock();
  9     // CyclicBarrier 是基于 Condition 的
 10     // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
 11     private final Condition trip = lock.newCondition();
 12
 13     // 参与的线程数
 14     private final int parties;
 15
 16     // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
 17     private final Runnable barrierCommand;
 18
 19     // 当前所处的“代”
 20     private Generation generation = new Generation();
 21
 22     // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
 23     // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
 24     private int count;
 25
 26     public CyclicBarrier(int parties, Runnable barrierAction) {
 27         if (parties <= 0) throw new IllegalArgumentException();
 28         this.parties = parties;
 29         this.count = parties;
 30         this.barrierCommand = barrierAction;
 31     }
 32
 33     public CyclicBarrier(int parties) {
 34         this(parties, null);
 35     }

首先,先看怎么开启新的一代:

  1 // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代”
  2 private void nextGeneration() {
  3     // 首先,需要唤醒所有的在栅栏上等待的线程
  4     trip.signalAll();
  5     // 更新 count 的值
  6     count = parties;
  7     // 重新生成“新一代”
  8     generation = new Generation();
  9 }
 10 

看看怎么打破一个栅栏:

  1 private void breakBarrier() {
  2     // 设置状态 broken 为 true
  3     generation.broken = true;
  4     // 重置 count 为初始值 parties
  5     count = parties;
  6     // 唤醒所有已经在等待的线程
  7     trip.signalAll();
  8 }
  9 

这两个方法之后用得到,现在开始分析最重要的等待通过栅栏方法 await 方法:

  1 // 不带超时机制
  2 public int await() throws InterruptedException, BrokenBarrierException {
  3     try {
  4         return dowait(false, 0L);
  5     } catch (TimeoutException toe) {
  6         throw new Error(toe); // cannot happen
  7     }
  8 }
  9 // 带超时机制,如果超时抛出 TimeoutException 异常
 10 public int await(long timeout, TimeUnit unit)
 11     throws InterruptedException,
 12            BrokenBarrierException,
 13            TimeoutException {
 14     return dowait(true, unit.toNanos(timeout));
 15 }
 16 

继续往里看:

  1 private int dowait(boolean timed, long nanos)
  2         throws InterruptedException, BrokenBarrierException,
  3                TimeoutException {
  4     final ReentrantLock lock = this.lock;
  5     // 先要获取到锁,然后在 finally 中要记得释放锁
  6     // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁
  7     lock.lock();
  8     try {
  9         final Generation g = generation;
 10         // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
 11         if (g.broken)
 12             throw new BrokenBarrierException();
 13         // 检查中断状态,如果中断了,抛出 InterruptedException 异常
 14         if (Thread.interrupted()) {
 15             breakBarrier();
 16             throw new InterruptedException();
 17         }
 18         // index 是这个 await 方法的返回值
 19         // 注意到这里,这个是从 count 递减后得到的值
 20         int index = --count;
 21
 22         // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
 23         if (index == 0) {  // tripped
 24             boolean ranAction = false;
 25             try {
 26                 // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
 27                 final Runnable command = barrierCommand;
 28                 if (command != null)
 29                     command.run();
 30                 // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
 31                 ranAction = true;
 32                 // 唤醒等待的线程,然后开启新的一代
 33                 nextGeneration();
 34                 return 0;
 35             } finally {
 36                 if (!ranAction)
 37                     // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
 38                     // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
 39                     breakBarrier();
 40             }
 41         }
 42
 43         // loop until tripped, broken, interrupted, or timed out
 44         // 如果是最后一个线程调用 await,那么上面就返回了
 45         // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
 46         for (;;) {
 47             try {
 48                 // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
 49                 if (!timed)
 50                     trip.await();
 51                 else if (nanos > 0L)
 52                     nanos = trip.awaitNanos(nanos);
 53             } catch (InterruptedException ie) {
 54                 // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
 55                 if (g == generation && ! g.broken) {
 56                     // 打破栅栏
 57                     breakBarrier();
 58                     // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
 59                     throw ie;
 60                 } else {
 61                     // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成,
 62                     // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可
 63                     // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常,
 64                     // 而是之后抛出 BrokenBarrierException 异常
 65                     Thread.currentThread().interrupt();
 66                 }
 67             }
 68
 69               // 唤醒后,检查栅栏是否是“破的”
 70             if (g.broken)
 71                 throw new BrokenBarrierException();
 72
 73             // 这个 for 循环除了异常,就是要从这里退出了
 74             // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
 75             // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的
 76             // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作,
 77             // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回
 78             // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码
 79             if (g != generation)
 80                 return index;
 81
 82             // 如果醒来发现超时了,打破栅栏,抛出异常
 83             if (timed && nanos <= 0L) {
 84                 breakBarrier();
 85                 throw new TimeoutException();
 86             }
 87         }
 88     } finally {
 89         lock.unlock();
 90     }
 91 }
 92 

唤醒线程,最后调用的是 Condition 的像 signal 的逻辑,向sync queue 里插进元素。

下面开始收尾工作。

首先,我们看看怎么得到有多少个线程到了栅栏上,处于等待状态:

  1 public int getNumberWaiting() {
  2     final ReentrantLock lock = this.lock;
  3     lock.lock();
  4     try {
  5         return parties - count;
  6     } finally {
  7         lock.unlock();
  8     }
  9 }
 10 

判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:

  1 public boolean isBroken() {
  2     final ReentrantLock lock = this.lock;
  3     lock.lock();
  4     try {
  5         return generation.broken;
  6     } finally {
  7         lock.unlock();
  8     }
  9 }
 10 

前面我们在说 await 的时候也几乎说清楚了,什么时候栅栏会被打破,总结如下:

  1. 中断,我们说了,如果某个等待的线程发生了中断,那么会打破栅栏,同时抛出 InterruptedException 异常;
  2. 超时,打破栅栏,同时抛出 TimeoutException 异常;
  3. 指定执行的操作抛出了异常,这个我们前面也说过。

最后,我们来看看怎么重置一个栅栏:

  1 public void reset() {
  2     final ReentrantLock lock = this.lock;
  3     lock.lock();
  4     try {
  5         breakBarrier();   // break the current generation
  6         nextGeneration(); // start a new generation
  7     } finally {
  8         lock.unlock();
  9     }
 10 }
 11 

我们设想一下,如果初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 之前,我们调用 reset 方法,那么会发生什么?

首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await 方法会通过抛出 BrokenBarrierException 异常返回。然后开启新的一代,重置了 count 和 generation,相当于一切归零了。

Semaphore

有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。

大概大家也可以猜到,Semaphore 其实也是 AQS 中共享锁的使用,因为每个线程共享一个池嘛。

套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

构造方法:

  1 public Semaphore(int permits) {
  2     sync = new NonfairSync(permits);
  3 }
  4
  5 public Semaphore(int permits, boolean fair) {
  6     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  7 }
  8 

这里和 ReentrantLock 类似,用了公平策略和非公平策略。

看 acquire 方法:

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }
 15 

这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。

我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:

  1 public void acquireUninterruptibly() {
  2     sync.acquireShared(1);
  3 }
  4 public final void acquireShared(int arg) {
  5     if (tryAcquireShared(arg) < 0)
  6         doAcquireShared(arg);
  7 }
  8 

前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:

  1 // 公平策略:
  2 protected int tryAcquireShared(int acquires) {
  3     for (;;) {
  4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
  5         if (hasQueuedPredecessors())
  6             return -1;
  7         int available = getState();
  8         int remaining = available - acquires;
  9         if (remaining < 0 ||
 10             compareAndSetState(available, remaining))
 11             return remaining;
 12     }
 13 }
 14 // 非公平策略:
 15 protected int tryAcquireShared(int acquires) {
 16     return nonfairTryAcquireShared(acquires);
 17 }
 18 final int nonfairTryAcquireShared(int acquires) {
 19     for (;;) {
 20         int available = getState();
 21         int remaining = available - acquires;
 22         if (remaining < 0 ||
 23             compareAndSetState(available, remaining))
 24             return remaining;
 25     }
 26 }
 27 

也是老套路了,所以从源码分析角度的话,我们其实不太需要关心是不是公平策略还是非公平策略,它们的区别往往就那么一两行。

我们再回到 acquireShared 方法,

  1 public final void acquireShared(int arg) {
  2     if (tryAcquireShared(arg) < 0)
  3         doAcquireShared(arg);
  4 }
  5 

由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:

  1 private void doAcquireShared(int arg) {
  2     final Node node = addWaiter(Node.SHARED);
  3     boolean failed = true;
  4     try {
  5         boolean interrupted = false;
  6         for (;;) {
  7             final Node p = node.predecessor();
  8             if (p == head) {
  9                 int r = tryAcquireShared(arg);
 10                 if (r >= 0) {
 11                     setHeadAndPropagate(node, r);
 12                     p.next = null; // help GC
 13                     if (interrupted)
 14                         selfInterrupt();
 15                     failed = false;
 16                     return;
 17                 }
 18             }
 19             if (shouldParkAfterFailedAcquire(p, node) &&
 20                 parkAndCheckInterrupt())
 21                 interrupted = true;
 22         }
 23     } finally {
 24         if (failed)
 25             cancelAcquire(node);
 26     }
 27 }
 28 

这个方法我就不介绍了,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }
 15 

tryReleaseShared 方法总是会返回 true,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程:

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }
 15 

Semphore 的源码里面也是有分公平锁和非公平锁的两种方式的使用,看一下获取锁的方法。默认实现的非公平锁。

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }

思想就是循环CAS 一直到设置成功或是 remaining <0 退出。

  1 // 公平策略:
  2 protected int tryAcquireShared(int acquires) {
  3     for (;;) {
  4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
  5         if (hasQueuedPredecessors())
  6             return -1;
  7         int available = getState();
  8         int remaining = available - acquires;
  9         if (remaining < 0 ||
 10             compareAndSetState(available, remaining))
 11             return remaining;
 12     }
 13 }
 14 // 非公平策略:
 15 protected int tryAcquireShared(int acquires) {
 16     return nonfairTryAcquireShared(acquires);
 17 }
 18 final int nonfairTryAcquireShared(int acquires) {
 19     for (;;) {
 20         int available = getState();
 21         int remaining = available - acquires;
 22         if (remaining < 0 ||
 23             compareAndSetState(available, remaining))
 24             return remaining;
 25     }
 26 }

获取锁的操作完成后要是获取不到锁的话,那么就会进入下面这个方法,很熟悉了,要是头结点就尝试获取锁,获取不到就进入阻塞等待。

  1 private void doAcquireShared(int arg) {
  2     final Node node = addWaiter(Node.SHARED);
  3     boolean failed = true;
  4     try {
  5         boolean interrupted = false;
  6         for (;;) {
  7             final Node p = node.predecessor();
  8             if (p == head) {
  9                 int r = tryAcquireShared(arg);
 10                 if (r >= 0) {
 11                     setHeadAndPropagate(node, r);
 12                     p.next = null; // help GC
 13                     if (interrupted)
 14                         selfInterrupt();
 15                     failed = false;
 16                     return;
 17                 }
 18             }
 19             if (shouldParkAfterFailedAcquire(p, node) &&
 20                 parkAndCheckInterrupt())
 21                 interrupted = true;
 22         }
 23     } finally {
 24         if (failed)
 25             cancelAcquire(node);
 26     }
 27 }

参考资料

原文地址:https://www.cnblogs.com/Benjious/p/10161640.html

时间: 2024-11-02 08:05:03

java 并发(五)---AbstractQueuedSynchronizer(3)的相关文章

java 并发(五)---AbstractQueuedSynchronizer

文章部分图片和代码来自参考文章. LockSupport 和 CLH 和 ConditionObject 阅读源码首先看一下注解 ,知道了大概的意思后,再进行分析.注释一开始就进行了概括.AQS的实现是基于FIFO等待队列的. Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out

java 并发(五)---AbstractQueuedSynchronizer(4)

读写锁 ReentrantReadWriteLock 首先我们来了解一下 ReentrantReadWriteLock 的作用是什么?和 ReentranLock 有什么区别?Reentrancy 英文的意思是可重入性.ReentrantReadWriteLock下文简称(rrwl)         下面总结来自   Java并发编程-ReentrantReadWriteLock ,你也可以从JDK 中阅读到这段. ReentrantReadWriteLock是Lock的另一种实现方式,我们已经

Java并发(五):并发,迭代器和容器

在随后的博文中我会继续分析并发包源码,在这里,得分别谈谈容器类和迭代器及其源码,虽然很突兀,但我认为这对于学习Java并发很重要; ConcurrentModificationException: JavaAPI中的解释:当不允许这样的修改时,可以通过检测到对象的并发修改的方法来抛出此异常.一个线程通常不允许修改集合,而另一个线程正在遍历它. 一般来说,在这种情况下,迭代的结果是未定义的. 某些迭代器实现(包括由JRE提供的所有通用集合实现的实现)可能会选择在检测到此行为时抛出此异常. 这样做的

Java并发编程-AbstractQueuedSynchronizer源码分析

简介 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础.使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态.然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作: java.util.concurrent.locks.Abstra

Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)

AbstractQueuedSynchronizer是实现Java并发类库的一个基础框架,Java中的各种锁(RenentrantLock, ReentrantReadWriteLock)以及同步工具类(Semaphore, CountDownLatch)等很多都是基于AbstractQueuedSynchronizer实现的.AbstractQueuedSynchronizer 一般简称AQS,Abstract表示他是一个抽象类,Queued表示他是基于先进先出 FIFO 等待队列实现的,Sy

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

《Java并发编程实战》第五章 同步容器类 读书笔记

一.同步容器类 1. 同步容器类的问题 线程容器类都是线程安全的.可是当在其上进行符合操作则须要而外加锁保护其安全性. 常见符合操作包括: . 迭代 . 跳转(依据指定顺序找到当前元素的下一个元素) . 条件运算 迭代问题能够查看之前的文章 <Java ConcurrentModificationException 异常分析与解决方式> 二.并发容器 集合类型 非线程安全 线程安全 List ArrayList CopyOnWriteArrayList Set SortedSet Concur

Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析

学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了

Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式

在上一篇<Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析>中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态.理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解.在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作.AQS为在独占模