【JDK源码分析】深入源码分析CountDownLatch

前言

CountDownLatch是一个闭锁实现,它可以使一个或者多个线程等待一组事件发生。它包含一个计数器,用来表示需要等待的事件数量,coutDown方法用于表示一个事件发生,计数器随之递减,而await方法等待计数器为0之前一直阻塞。它是基于AQS的共享锁来实现的,其中使用了较多的AQS的方法,所以在这之前最好阅读过AQS的源码,不嫌弃也可以查看本人之前AQS的源码分析,有些AQS方法没有在之前分析过的这里涉及到了会进行分析。

源码

我们先看它的属性和构造器,

    // Sync为其内部类
    private final Sync sync;

    // 唯一的一个构造器
    // 构造参数count就是需要等待事件的数量
    public CountDownLatch(int count) {
        // 为了保证count >= 0
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 构造sync
        this.sync = new Sync(count);
    }

现在来看内部类Sync,它继承了AQS,实现了共享锁方法,下面来看其源码,代码行数不多很好理解

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // setState 为AQS更改其state变量的方法
            // 将AQS state变量设置成count
            setState(count);
        }

        int getCount() {
            // AQS的获取state锁状态值
            return getState();
        }
        // 尝试获取共享锁
        protected int tryAcquireShared(int acquires) {
            // 返回1表示此时锁状态值为0表示锁已释放
            // -1表示此时锁状态值大于0,表示出于锁定状态
            return (getState() == 0) ? 1 : -1;
        }
        // 尝试释放共享锁(计数器递减releases次)
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 等待锁状态值为0或者更改锁状态值成功
            for (;;) {
                // 将state赋值给变量c
                int c = getState();
                if (c == 0)
                    // 此时锁已清除
                    return false;
                // 递减
                int nextc = c-1;
                // 比较state的状态值是否等于C,等于将state状态值改为nextc
                if (compareAndSetState(c, nextc))
                    // 更改成功后,如果nextc为0则返回true
                    return nextc == 0;
            }
        }
    }

await方法

await方法就是当state状态值不为0时将当前线程阻塞,然后等待唤醒

    public void await() throws InterruptedException {
        //调用的AQS获取共享锁可中断方法
        sync.acquireSharedInterruptibly(1);
    }

我们来看看AQS的acquireSharedInterruptibly方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 此方法调用的是CountDownLatch内部类Sync的方法
        // 如果锁状态不为0,则执行doAcquireSharedInterruptibly方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

doAcquireSharedInterruptibly方法也是由AQS实现的

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加一个共享锁节点到队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 直到线程被唤醒或者线程被中断时跳出循环
            for (;;) {
                // node节点的前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 调用CountDownLatch内部类Sync的方法
                    // 如果锁状态值为0,则返回值大于0
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 当锁状态值为0,开始将note节点设置为头节点并唤醒后继节点
                        // 也就是队列不断的出列,然后唤醒后继节点,后继节点被唤醒后由于前驱节点被设置成头节点,又会调用该方法进行后继节点的唤醒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

                /*
                 shouldParkAfterFailedAcquire用于清除已中断/或者取消的线程以及判断此次循环是否需要挂起线程
                 parkAndCheckInterrupt 挂机当前线程
                 shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 在AQS之前博文里分析过这里就不再分析了
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 表示当前线程中断,取消获取锁
                // 之前分析过,略过源码分析
                cancelAcquire(node);
        }
    }

setHeadAndPropagate方法,主要作用是唤醒后继节点线程

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        // 当前节点设置为头节点,节点关联的线程设置为空
        setHead(node)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // 节点等待状态为signal时,唤醒后继节点线程
                doReleaseShared();
        }
    }

doReleaseShared很巧妙,当当前节点等待状态为signal时,唤醒后继节点线程

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 当前线程等待状态为signal时表示后继节点需要唤醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        // 表示h节点的状态替换失败,会再次循环判断h节点的状态
                        continue;            // loop to recheck cases
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 状态为0时,将其改成PROPAGATE,更改失败会再次循环判断h节点的状态          // 这种情况发生在一个线程调用await方法,节点的等待状态还是初始值0未来得及被修改,刚好state被置为0然后调用了doReleaseShared方法
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

countDown方法

countDown方法递减state值,当值为0时,依次唤醒等待的线程

    public void countDown() {
        // 递减一次state值,知道state为0时唤醒等待中的线程
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        // 尝试将state减去arg
        if (tryReleaseShared(arg)) {
            // state为0时唤醒线程
            doReleaseShared();
            return true;
        }
        return false;
    }

到此分析完毕。

总结

  1. 通过源码知道CountDownLatch 不能像CyclicBarrier那样使用完毕后还可以复用;
  2. CountDownLatch 是通过共享锁来实现的,它的构造参数就是AQS state的值;
  3. 由于内部类继承了AQS,所以它内部也是FIFO队列,同时也一样是前驱节点唤醒后继节点。

原文地址:https://www.cnblogs.com/d-homme/p/9375105.html

时间: 2024-08-02 23:32:28

【JDK源码分析】深入源码分析CountDownLatch的相关文章

JDK中String类的源码分析(二)

1.startsWith(String prefix, int toffset)方法 包括startsWith(*),endsWith(*)方法,都是调用上述一个方法 1 public boolean startsWith(String prefix, int toffset) { 2 char ta[] = value; 3 int to = toffset; 4 char pa[] = prefix.value; 5 int po = 0; 6 int pc = prefix.value.l

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二) 1.通过mutate(put)操作,将单个put操作添加到缓冲操作中,这些缓冲操作其实就是Put的父类的一个List的集合.如下: private List<Row> writeAsyncBuffer = new LinkedList<>(); writeAsyncBuffer.add(m); 当writeAsyncBuffer满了之后或者是人为的调用backgroundFlushCommits操作促使缓冲池中的

OpenStack_Swift源码分析——Object-auditor源码分析(2)

1 Object-aduitor审计具体分析 上一篇文章中,讲解了Object-aduitor的启动,其中审计的具体执行是AuditorWorker实现的,在run_audit中实例化了AuditorWorker类,并调用audit_all_objects方法,下面看此方法的具体代码实现: def audit_all_objects(self, mode='once', device_dirs=None): #run_forever传过来的mode 为forever description =

OpenStack_Swift源码分析——Object-auditor源码分析(1)

1 Object-auditor 的启动 Object-auditor的启动和object-replicator的启动过程是一样的,首先是执行启动脚本 swift-init object-auditor start 启动脚本会运行swift源码bin目录下的swift-ojbect-auditor if __name__ == '__main__': parser = OptionParser("%prog CONFIG [options]") parser.add_option('-

nginx源码分析--从源码看nginx框架总结

nginx源码总结: 1)代码中没有特别绕特别别扭的编码实现,从变量的定义调用函数的实现封装,都非常恰当,比如从函数命名或者变量命名就可以看出来定义的大体意义,函数的基本功能,再好的架构实现在编码习惯差的人实现也会黯然失色,如果透彻理解代码的实现,领悟架构的设计初衷,觉得每块代码就想经过耐心雕琢一样,不仅仅实现了基本的功能给你,为其他人阅读也会提供很好的支持.细致恰当的命名规则就可以看出作者的功力. 2)更好更高的软件性能体现在架构设计上,好的架构会让软件更加稳定.容易维护.便于扩展.从核心模块

kafka源码分析之一server启动分析

1. 分析kafka源码的目的 深入掌握kafka的内部原理 深入掌握scala运用 2. server的启动 如下所示(本来准备用时序图的,但感觉时序图没有思维图更能反映,故采用了思维图): 2.1 启动入口Kafka.scala 从上面的思维导图,可以看到Kafka的启动入口是Kafka.scala的main()函数: def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args)

老李推荐:第6章5节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-事件

老李推荐:第6章5节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-事件 从网络过来的命令字串需要解析翻译出来,有些命令会在翻译好后直接执行然后返回,但有一大部分命令在翻译后需要转换成对应的事件,然后放入到命令队列里面等待执行.Monkey在取出一个事件执行的时候主要是执行其injectEvent方法来注入事件,而注入事件根据是否需要往系统注入事件分为两种: 需要通过系统服务往系统注入事件:如MonkeyKeyEvent事件会通过系统的InputManager往系

老李推荐:第6章3节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-命令翻译类

老李推荐:第6章3节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-命令翻译类 每个来自网络的字串命令都需要进行解析执行,只是有些是在解析的过程中直接执行了事,而有些是需要在解析后创建相应的事件类实例并添加到命令队列里面排队执行.负责这部分工作的就是命令翻译类.那么我们往下还是继续在MonkeySourceNetwork这个范畴中MonkeyCommand类是怎么一回事: 图6-3-1 MonkeyCommand族谱 图中间的MonkeyCommand是一个接口,

[ASP.NET]分析MVC5源码,并实现一个ASP.MVC

本节内容不是MVC入门教程,主要讲MVC原理,实现一个和ASP.NET MVC类似基本原理的项目. MVC原理是依赖于ASP.NET管道事件基础之上的.对于这块,可阅读上节内容 [ASP.NET]谈谈IIS与ASP.NET管道 本节目录: MVC简介 MVC5源码 实现一个MVC MVC简介 随着技术的发展,现在已经将MVC模式等同于三层模式. 如果要严格区分的话,UI层指View和Controller,BLL,DAL层和模型层都属于Model中. 在建立MVC项目的时候,选择空的项目,会建立一

转 深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇) 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍了mybatis的配置以及使用, 那么本篇将走进mybatis的源码,分析mybatis 的执行流程, 好啦,鄙人不喜欢口水话,还是直接上干活吧: 1. SqlSessionFactory 与 SqlSession. 通过前面的章节对于mybatis 的介绍及使用,大家都能体会到SqlSession的重要性了吧, 没错,从表面上来看,