上一次学习了ReetrantLock,是对AQS独占模式的,这次学习CountDownLatch,是共享模式api的实现。人生不死,学无止境。先看个demo吧:
import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { private static CountDownLatch count1 = new CountDownLatch(1); private static CountDownLatch count2 = new CountDownLatch(10); public static void main(String[] args){ // boss for (int i = 0; i < 1; i++) { new Thread(new Runnable() { @Override public void run() { try { count2.await(); System.out.println("boss说开会"); Thread.sleep(3000); count1.countDown(); System.out.println("boss说散会"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } //一堆干活小弟 for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { count2.countDown(); System.out.println(Thread.currentThread() + "进入会议室"); count1.await(); System.out.println(Thread.currentThread()+ "离开会议室"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } }
boss等待所有干活小弟进入会议室开会,小弟等待boss说散会才敢走人。CountDownLatch的功能就是这个,一个或一组线程等待另一个或一组完成才能运行,内部static对AQS2个共享api实现。共享API的2个方法:
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
AQS共享模式的处理流程大致是:
Acquire:
if(tryAcquireShared<0)
加入队列
release:
if(tryReleaseShared)
将队列所有节点unpark(独占模式是release一个)
看下CountDownLatch的内部类实现:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //state为count数量 Sync(int count) { setState(count); } int getCount() { return getState(); } //acquire检查state值 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //cas设置state值 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
CountDownLatch的实现:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); }
CountDownLatch的代码还是比较简单的,构造函数传入count数量,内部类sync设置state值,响应中断的await用来acquire,检查state的值,不会0就加入AQS的同步等待队列,当有线程countDown时递减state值,一直到有线程递减到state值为0时,唤醒AQS等待队列所有线程。
时间: 2024-10-13 21:04:50