JUC源码分析10-locks-CountDownLatch

上一次学习了ReetrantLock,是对AQS独占模式的,这次学习CountDownLatch,是共享模式api的实现。人生不死,学无止境。先看个demo吧:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {

    private static CountDownLatch count1 = new CountDownLatch(1);

    private static CountDownLatch count2 = new CountDownLatch(10);

    public static void main(String[] args){

        // boss
        for (int i = 0; i < 1; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        count2.await();
                        System.out.println("boss说开会");
                        Thread.sleep(3000);
                        count1.countDown();
                        System.out.println("boss说散会");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        //一堆干活小弟
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        count2.countDown();
                        System.out.println(Thread.currentThread() + "进入会议室");
                        count1.await();
                        System.out.println(Thread.currentThread()+ "离开会议室");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

boss等待所有干活小弟进入会议室开会,小弟等待boss说散会才敢走人。CountDownLatch的功能就是这个,一个或一组线程等待另一个或一组完成才能运行,内部static对AQS2个共享api实现。共享API的2个方法:

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

AQS共享模式的处理流程大致是:

Acquire:

if(tryAcquireShared<0)

加入队列

release:

if(tryReleaseShared)

将队列所有节点unpark(独占模式是release一个)

看下CountDownLatch的内部类实现:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
	//state为count数量
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

	//acquire检查state值
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
	//cas设置state值
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}	

CountDownLatch的实现:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

public long getCount() {
    return sync.getCount();
}

CountDownLatch的代码还是比较简单的,构造函数传入count数量,内部类sync设置state值,响应中断的await用来acquire,检查state的值,不会0就加入AQS的同步等待队列,当有线程countDown时递减state值,一直到有线程递减到state值为0时,唤醒AQS等待队列所有线程。

时间: 2024-10-13 21:04:50

JUC源码分析10-locks-CountDownLatch的相关文章

Solr4.8.0源码分析(10)之Lucene的索引文件(3)

Solr4.8.0源码分析(10)之Lucene的索引文件(3) 1. .si文件 .si文件存储了段的元数据,主要涉及SegmentInfoFormat.java和Segmentinfo.java这两个文件.由于本文介绍的Solr4.8.0,所以对应的是SegmentInfoFormat的子类Lucene46SegmentInfoFormat. 首先来看下.si文件的格式 头部(header) 版本(SegVersion) doc个数(SegSize) 是否符合文档格式(IsCompoundF

JUC源码分析-集合篇(三)ConcurrentLinkedQueue

JUC源码分析-集合篇(三)ConcurrentLinkedQueue 在并发编程中,有时候需要使用线程安全的队列.如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法.使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现.非阻塞的实现方 式则可以使用循环 CAS 的方式来实现.本节让我们一起来研究一下 Doug Lea 是如何使用非阻塞的方式来实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理 以 LinkedBlockingQueue 分析 BlockingQueue 阻塞式队列的实现原理. 1. 数据结构 LinkedBlockingQueue 和 ConcurrentLinkedQueue 一样都是由 head 节点和 last 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列.默认情况下

JUC源码分析-集合篇(七)PriorityBlockingQueue

JUC源码分析-集合篇(七)PriorityBlockingQueue PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现. PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock. 1. 基本属性 // 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是因为有的 VM 实现在数组头有些内容 private stati

JUC源码分析16-集合-ConcurrentSkipListMap、ConcurrentSkipListSet

NBA这赛季结束,勇士可惜啊,谁能想到没拿到冠军,库昊也没成为真正的老大,lbl一战封神,所有口水留言都变成羡慕嫉妒恨,哎,我库啊,还是还是看书吧. ConcurrentSkipListMap说实话,之前还真没注意过,还是看JUC才看到,利用skiplist跳表结构来实现一种有序的map,之前看到的map都是无序.在学习前还是要好好了解下什么是skiplist跳表,的确很不错,利用空间换时间,复杂度为logN,跳表的原理参考http://kenby.iteye.com/blog/1187303,

red5源码分析---10

red5源码分析-服务器处理publish命令 和前几章的分析一样,服务器接收到客户端发来的publish命令后,最终会执行RTMPHandler的onCommand函数,再参考<red5源码分析-8>的分析,最终会调用StreamService的publish方法,代码如下 public void publish(String name, String mode) { Map<String, String> params = null; if (name != null &

JUC源码分析6-locks-AQS-独占模式

AbstractQueuedSynchronizer(下面简称AQS),javadoc说明: Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on  first-in-first-out (FIFO) wait queues. 1.提供一个FIFO等待队列,使用方法伪代码表示就是: Acquire: if(!获取到锁

spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理 看线程运行的run方法,见代码注释 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClass

JUC源码分析15-集合-ConcurrentHashMap

好几天没看juc了,之前看了HashMap,还有个差不多的HashTable,二者的结构大致相同,小小的比较下2者的不同: 1.HashMap是非线程安全的,HashTable通过synchronized加锁实现线程安全.如果我们的代码里存在{get();...;put()}这种操作的话就保证不了: 2.HashMap可以存储key或value为null的值,HashTable不行: 3.初始大小HashTable是11,HashMap是16,扩容的话,HashTable是2*old+1,Has