文章代码分析和部分图片来自参考文章
认识 CountDownLatch
分析这个类,首先了解一下它所可以实现的效果,然后再顺着这个源码的思路思考是不是和它实现的效果一样。下面的代码和图片可以说明 CountDownLatch (下文简称CDL)的工作过程。
1 public class CountDownLatchDemo { 2 3 public static void main(String[] args) { 4 5 CountDownLatch latch = new CountDownLatch(2); 6 7 Thread t1 = new Thread(new Runnable() { 8 @Override 9 public void run() { 10 try { 11 Thread.sleep(5000); 12 } catch (InterruptedException ignore) { 13 } 14 // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown() 15 latch.countDown(); 16 } 17 }, "t1"); 18 19 Thread t2 = new Thread(new Runnable() { 20 @Override 21 public void run() { 22 try { 23 Thread.sleep(10000); 24 } catch (InterruptedException ignore) { 25 } 26 // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown() 27 latch.countDown(); 28 } 29 }, "t2"); 30 31 t1.start(); 32 t2.start(); 33 34 Thread t3 = new Thread(new Runnable() { 35 @Override 36 public void run() { 37 try { 38 // 阻塞,等待 state 减为 0 39 latch.await(); 40 System.out.println("线程 t3 从 await 中返回了"); 41 } catch (InterruptedException e) { 42 System.out.println("线程 t3 await 被中断"); 43 Thread.currentThread().interrupt(); 44 } 45 } 46 }, "t3"); 47 Thread t4 = new Thread(new Runnable() { 48 @Override 49 public void run() { 50 try { 51 // 阻塞,等待 state 减为 0 52 latch.await(); 53 System.out.println("线程 t4 从 await 中返回了"); 54 } catch (InterruptedException e) { 55 System.out.println("线程 t4 await 被中断"); 56 Thread.currentThread().interrupt(); 57 } 58 } 59 }, "t4"); 60 61 t3.start(); 62 t4.start(); 63 } 64 }
我们知道这里面实际需要分析的方法就是 await 和 countDown 方法。
源码分析 CountDownLatch
从方法中我们就可以知道CDL中的实现AQS的共享模式获取锁。我们以上面的Demo 来做源码分析
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 public final void acquireSharedInterruptibly(int arg) 5 throws InterruptedException { 6 // 这也是老套路了,我在第二篇的中断那一节说过了 7 if (Thread.interrupted()) 8 throw new InterruptedException(); 9 // t3 和 t4 调用 await 的时候,state 都大于 0。 10 // 也就是说,这个 if 返回 true,然后往里看 11 if (tryAcquireShared(arg) < 0) 12 doAcquireSharedInterruptibly(arg); 13 } 14 // 只有当 state == 0 的时候,这个方法才会返回 1 15 protected int tryAcquireShared(int acquires) { 16 return (getState() == 0) ? 1 : -1; 17 }
1 private void doAcquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 // 1. 入队 4 final Node node = addWaiter(Node.SHARED); 5 boolean failed = true; 6 try { 7 for (;;) { 8 final Node p = node.predecessor(); 9 if (p == head) { 10 // 同上,只要 state 不等于 0,那么这个方法返回 -1 11 int r = tryAcquireShared(arg); 12 if (r >= 0) { 13 setHeadAndPropagate(node, r); 14 p.next = null; // help GC 15 failed = false; 16 return; 17 } 18 } 19 // 2 20 if (shouldParkAfterFailedAcquire(p, node) && 21 parkAndCheckInterrupt()) 22 throw new InterruptedException(); 23 } 24 } finally { 25 if (failed) 26 cancelAcquire(node); 27 } 28 }
图一是两个线程假如到阻塞队列中的情景,图二是CountDown 方法执行的过程。
图一
图二(例子中数量不为10,此处仅为说明执行过程)
1 public void countDown() { 2 sync.releaseShared(1); 3 } 4 public final boolean releaseShared(int arg) { 5 // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true 6 // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了 7 if (tryReleaseShared(arg)) { 8 // 唤醒 await 的线程 9 doReleaseShared(); 10 return true; 11 } 12 return false; 13 } 14 // 这个方法很简单,用自旋的方法实现 state 减 1 15 protected boolean tryReleaseShared(int releases) { 16 for (;;) { 17 int c = getState(); 18 if (c == 0) 19 return false; 20 int nextc = c-1; 21 if (compareAndSetState(c, nextc)) 22 return nextc == 0; 23 } 24 } 25 countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程: 26 27 // 调用这个方法的时候,state == 0 28 // 这个方法先不要看所有的代码,按照思路往下到我写注释的地方,其他的之后还会仔细分析 29 private void doReleaseShared() { 30 for (;;) { 31 Node h = head; 32 if (h != null && h != tail) { 33 int ws = h.waitStatus; 34 // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了 35 if (ws == Node.SIGNAL) { 36 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 37 continue; // loop to recheck cases 38 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 39 // 在这里,也就是唤醒 t3 40 unparkSuccessor(h); 41 } 42 else if (ws == 0 && 43 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo 44 continue; // loop on failed CAS 45 } 46 if (h == head) // loop if head changed 47 break; 48 } 49 }
接下来就是唤醒了哦!
1 private void doAcquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 final Node node = addWaiter(Node.SHARED); 4 boolean failed = true; 5 try { 6 for (;;) { 7 final Node p = node.predecessor(); 8 if (p == head) { 9 int r = tryAcquireShared(arg); 10 if (r >= 0) { 11 setHeadAndPropagate(node, r); // 2. 这里是下一步 12 p.next = null; // help GC 13 failed = false; 14 return; 15 } 16 } 17 if (shouldParkAfterFailedAcquire(p, node) && 18 // 1. 唤醒后这个方法返回 19 parkAndCheckInterrupt()) 20 throw new InterruptedException(); 21 } 22 } finally { 23 if (failed) 24 cancelAcquire(node); 25 } 26 }
1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; // Record old head for check below 3 setHead(node); 4 5 // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4 6 // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了 7 if (propagate > 0 || h == null || h.waitStatus < 0 || 8 (h = head) == null || h.waitStatus < 0) { 9 Node s = node.next; 10 if (s == null || s.isShared()) 11 // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了 12 doReleaseShared(); 13 } 14 }
1 // 调用这个方法的时候,state == 0 2 private void doReleaseShared() { 3 for (;;) { 4 Node h = head; 5 // 1. h == null: 说明阻塞队列为空 6 // 2. h == tail: 说明头结点可能是刚刚初始化的头节点, 7 // 或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了 8 // 所以这两种情况不需要进行唤醒后继节点 9 if (h != null && h != tail) { 10 int ws = h.waitStatus; 11 // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了 12 if (ws == Node.SIGNAL) { 13 // 这里 CAS 失败的场景请看下面的解读 14 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 15 continue; // loop to recheck cases 16 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 17 // 在这里,也就是唤醒 t4 18 unparkSuccessor(h); 19 } 20 else if (ws == 0 && 21 // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1 22 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 23 continue; // loop on failed CAS 24 } 25 // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环 26 // 否则,就是 head 没变,那么退出循环, 27 // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的 28 if (h == head) // loop if head changed 29 break; 30 } 31 }
这里方法要注意了哦,t3 作为唤醒者,走到了 doReleaseShared 执行唤醒的操作,t4作为被唤醒者,也会走到 doReleaseShared ,这个方法,OK ,但是 t3的那个线程继续走到 28行的时候,有可能由于此时head 是被T4修改了(看唤醒后的流程),所以 h==head 返回 false ,那么循环就会继续,继续的话,就有可能两线程在 14行相遇,导致了 CAS 失败。失败了没关系,需要知道的是,总会有一个获得了锁,然后接着唤醒后面的线程,即是说会有一定几率发生多个线程唤醒后面锁的情况,但是为什么要这样做呢?个人理解 :
- 方法复用
- 提升吞吐量
CyclicBarrier
CyclicBarrier 基于 Condition 来实现,CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。CyclicBarrier 是基于 AQS 的 ConditionObject 和 ReentranLock 实现的。
下面代码分析来自参考文章,建议可以去阅读一下。
1 public class CyclicBarrier { 2 // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代" 3 private static class Generation { 4 boolean broken = false; 5 } 6 7 /** The lock for guarding barrier entry */ 8 private final ReentrantLock lock = new ReentrantLock(); 9 // CyclicBarrier 是基于 Condition 的 10 // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上 11 private final Condition trip = lock.newCondition(); 12 13 // 参与的线程数 14 private final int parties; 15 16 // 如果设置了这个,代表越过栅栏之前,要执行相应的操作 17 private final Runnable barrierCommand; 18 19 // 当前所处的“代” 20 private Generation generation = new Generation(); 21 22 // 还没有到栅栏的线程数,这个值初始为 parties,然后递减 23 // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量 24 private int count; 25 26 public CyclicBarrier(int parties, Runnable barrierAction) { 27 if (parties <= 0) throw new IllegalArgumentException(); 28 this.parties = parties; 29 this.count = parties; 30 this.barrierCommand = barrierAction; 31 } 32 33 public CyclicBarrier(int parties) { 34 this(parties, null); 35 }
首先,先看怎么开启新的一代:
1 // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代” 2 private void nextGeneration() { 3 // 首先,需要唤醒所有的在栅栏上等待的线程 4 trip.signalAll(); 5 // 更新 count 的值 6 count = parties; 7 // 重新生成“新一代” 8 generation = new Generation(); 9 } 10
看看怎么打破一个栅栏:
1 private void breakBarrier() { 2 // 设置状态 broken 为 true 3 generation.broken = true; 4 // 重置 count 为初始值 parties 5 count = parties; 6 // 唤醒所有已经在等待的线程 7 trip.signalAll(); 8 } 9
这两个方法之后用得到,现在开始分析最重要的等待通过栅栏方法 await 方法:
1 // 不带超时机制 2 public int await() throws InterruptedException, BrokenBarrierException { 3 try { 4 return dowait(false, 0L); 5 } catch (TimeoutException toe) { 6 throw new Error(toe); // cannot happen 7 } 8 } 9 // 带超时机制,如果超时抛出 TimeoutException 异常 10 public int await(long timeout, TimeUnit unit) 11 throws InterruptedException, 12 BrokenBarrierException, 13 TimeoutException { 14 return dowait(true, unit.toNanos(timeout)); 15 } 16
继续往里看:
1 private int dowait(boolean timed, long nanos) 2 throws InterruptedException, BrokenBarrierException, 3 TimeoutException { 4 final ReentrantLock lock = this.lock; 5 // 先要获取到锁,然后在 finally 中要记得释放锁 6 // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁 7 lock.lock(); 8 try { 9 final Generation g = generation; 10 // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常 11 if (g.broken) 12 throw new BrokenBarrierException(); 13 // 检查中断状态,如果中断了,抛出 InterruptedException 异常 14 if (Thread.interrupted()) { 15 breakBarrier(); 16 throw new InterruptedException(); 17 } 18 // index 是这个 await 方法的返回值 19 // 注意到这里,这个是从 count 递减后得到的值 20 int index = --count; 21 22 // 如果等于 0,说明所有的线程都到栅栏上了,准备通过 23 if (index == 0) { // tripped 24 boolean ranAction = false; 25 try { 26 // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行 27 final Runnable command = barrierCommand; 28 if (command != null) 29 command.run(); 30 // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况 31 ranAction = true; 32 // 唤醒等待的线程,然后开启新的一代 33 nextGeneration(); 34 return 0; 35 } finally { 36 if (!ranAction) 37 // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏 38 // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties 39 breakBarrier(); 40 } 41 } 42 43 // loop until tripped, broken, interrupted, or timed out 44 // 如果是最后一个线程调用 await,那么上面就返回了 45 // 下面的操作是给那些不是最后一个到达栅栏的线程执行的 46 for (;;) { 47 try { 48 // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await 49 if (!timed) 50 trip.await(); 51 else if (nanos > 0L) 52 nanos = trip.awaitNanos(nanos); 53 } catch (InterruptedException ie) { 54 // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断 55 if (g == generation && ! g.broken) { 56 // 打破栅栏 57 breakBarrier(); 58 // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法 59 throw ie; 60 } else { 61 // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成, 62 // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可 63 // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常, 64 // 而是之后抛出 BrokenBarrierException 异常 65 Thread.currentThread().interrupt(); 66 } 67 } 68 69 // 唤醒后,检查栅栏是否是“破的” 70 if (g.broken) 71 throw new BrokenBarrierException(); 72 73 // 这个 for 循环除了异常,就是要从这里退出了 74 // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代 75 // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的 76 // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作, 77 // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回 78 // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码 79 if (g != generation) 80 return index; 81 82 // 如果醒来发现超时了,打破栅栏,抛出异常 83 if (timed && nanos <= 0L) { 84 breakBarrier(); 85 throw new TimeoutException(); 86 } 87 } 88 } finally { 89 lock.unlock(); 90 } 91 } 92
唤醒线程,最后调用的是 Condition 的像 signal 的逻辑,向sync queue 里插进元素。
下面开始收尾工作。
首先,我们看看怎么得到有多少个线程到了栅栏上,处于等待状态:
1 public int getNumberWaiting() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 return parties - count; 6 } finally { 7 lock.unlock(); 8 } 9 } 10
判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:
1 public boolean isBroken() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 return generation.broken; 6 } finally { 7 lock.unlock(); 8 } 9 } 10
前面我们在说 await 的时候也几乎说清楚了,什么时候栅栏会被打破,总结如下:
- 中断,我们说了,如果某个等待的线程发生了中断,那么会打破栅栏,同时抛出 InterruptedException 异常;
- 超时,打破栅栏,同时抛出 TimeoutException 异常;
- 指定执行的操作抛出了异常,这个我们前面也说过。
最后,我们来看看怎么重置一个栅栏:
1 public void reset() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 breakBarrier(); // break the current generation 6 nextGeneration(); // start a new generation 7 } finally { 8 lock.unlock(); 9 } 10 } 11
我们设想一下,如果初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 之前,我们调用 reset 方法,那么会发生什么?
首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await 方法会通过抛出 BrokenBarrierException 异常返回。然后开启新的一代,重置了 count 和 generation,相当于一切归零了。
Semaphore
有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。
大概大家也可以猜到,Semaphore 其实也是 AQS 中共享锁的使用,因为每个线程共享一个池嘛。
套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
构造方法:
1 public Semaphore(int permits) { 2 sync = new NonfairSync(permits); 3 } 4 5 public Semaphore(int permits, boolean fair) { 6 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 7 } 8
这里和 ReentrantLock 类似,用了公平策略和非公平策略。
看 acquire 方法:
1 public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 public void acquireUninterruptibly() { 5 sync.acquireShared(1); 6 } 7 public void acquire(int permits) throws InterruptedException { 8 if (permits < 0) throw new IllegalArgumentException(); 9 sync.acquireSharedInterruptibly(permits); 10 } 11 public void acquireUninterruptibly(int permits) { 12 if (permits < 0) throw new IllegalArgumentException(); 13 sync.acquireShared(permits); 14 } 15
这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。
我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:
1 public void acquireUninterruptibly() { 2 sync.acquireShared(1); 3 } 4 public final void acquireShared(int arg) { 5 if (tryAcquireShared(arg) < 0) 6 doAcquireShared(arg); 7 } 8
前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:
1 // 公平策略: 2 protected int tryAcquireShared(int acquires) { 3 for (;;) { 4 // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作 5 if (hasQueuedPredecessors()) 6 return -1; 7 int available = getState(); 8 int remaining = available - acquires; 9 if (remaining < 0 || 10 compareAndSetState(available, remaining)) 11 return remaining; 12 } 13 } 14 // 非公平策略: 15 protected int tryAcquireShared(int acquires) { 16 return nonfairTryAcquireShared(acquires); 17 } 18 final int nonfairTryAcquireShared(int acquires) { 19 for (;;) { 20 int available = getState(); 21 int remaining = available - acquires; 22 if (remaining < 0 || 23 compareAndSetState(available, remaining)) 24 return remaining; 25 } 26 } 27
也是老套路了,所以从源码分析角度的话,我们其实不太需要关心是不是公平策略还是非公平策略,它们的区别往往就那么一两行。
我们再回到 acquireShared 方法,
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3 doAcquireShared(arg); 4 } 5
由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:
1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED); 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 final Node p = node.predecessor(); 8 if (p == head) { 9 int r = tryAcquireShared(arg); 10 if (r >= 0) { 11 setHeadAndPropagate(node, r); 12 p.next = null; // help GC 13 if (interrupted) 14 selfInterrupt(); 15 failed = false; 16 return; 17 } 18 } 19 if (shouldParkAfterFailedAcquire(p, node) && 20 parkAndCheckInterrupt()) 21 interrupted = true; 22 } 23 } finally { 24 if (failed) 25 cancelAcquire(node); 26 } 27 } 28
这个方法我就不介绍了,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:
1 public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 public void acquireUninterruptibly() { 5 sync.acquireShared(1); 6 } 7 public void acquire(int permits) throws InterruptedException { 8 if (permits < 0) throw new IllegalArgumentException(); 9 sync.acquireSharedInterruptibly(permits); 10 } 11 public void acquireUninterruptibly(int permits) { 12 if (permits < 0) throw new IllegalArgumentException(); 13 sync.acquireShared(permits); 14 } 15
tryReleaseShared 方法总是会返回 true,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程:
1 public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 public void acquireUninterruptibly() { 5 sync.acquireShared(1); 6 } 7 public void acquire(int permits) throws InterruptedException { 8 if (permits < 0) throw new IllegalArgumentException(); 9 sync.acquireSharedInterruptibly(permits); 10 } 11 public void acquireUninterruptibly(int permits) { 12 if (permits < 0) throw new IllegalArgumentException(); 13 sync.acquireShared(permits); 14 } 15
Semphore 的源码里面也是有分公平锁和非公平锁的两种方式的使用,看一下获取锁的方法。默认实现的非公平锁。
1 public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 public void acquireUninterruptibly() { 5 sync.acquireShared(1); 6 } 7 public void acquire(int permits) throws InterruptedException { 8 if (permits < 0) throw new IllegalArgumentException(); 9 sync.acquireSharedInterruptibly(permits); 10 } 11 public void acquireUninterruptibly(int permits) { 12 if (permits < 0) throw new IllegalArgumentException(); 13 sync.acquireShared(permits); 14 }
思想就是循环CAS 一直到设置成功或是 remaining <0 退出。
1 // 公平策略: 2 protected int tryAcquireShared(int acquires) { 3 for (;;) { 4 // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作 5 if (hasQueuedPredecessors()) 6 return -1; 7 int available = getState(); 8 int remaining = available - acquires; 9 if (remaining < 0 || 10 compareAndSetState(available, remaining)) 11 return remaining; 12 } 13 } 14 // 非公平策略: 15 protected int tryAcquireShared(int acquires) { 16 return nonfairTryAcquireShared(acquires); 17 } 18 final int nonfairTryAcquireShared(int acquires) { 19 for (;;) { 20 int available = getState(); 21 int remaining = available - acquires; 22 if (remaining < 0 || 23 compareAndSetState(available, remaining)) 24 return remaining; 25 } 26 }
获取锁的操作完成后要是获取不到锁的话,那么就会进入下面这个方法,很熟悉了,要是头结点就尝试获取锁,获取不到就进入阻塞等待。
1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED); 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 final Node p = node.predecessor(); 8 if (p == head) { 9 int r = tryAcquireShared(arg); 10 if (r >= 0) { 11 setHeadAndPropagate(node, r); 12 p.next = null; // help GC 13 if (interrupted) 14 selfInterrupt(); 15 failed = false; 16 return; 17 } 18 } 19 if (shouldParkAfterFailedAcquire(p, node) && 20 parkAndCheckInterrupt()) 21 interrupted = true; 22 } 23 } finally { 24 if (failed) 25 cancelAcquire(node); 26 } 27 }
参考资料
原文地址:https://www.cnblogs.com/Benjious/p/10161640.html