CountDownLatch源码解析

  一、CountDownLatch介绍

      CountDownLatch是在jdk1.5被引入的,它主要是通过一个计数器来实现的,当在初始化该类的构造函数时,会事先传入一个状态值,之后在执行await方法后,

    在这个状态值为0之前,当前线程(指的是调用await的线程)会一直等待。它内部使用了AQS来实现的,且是共享锁,具体怎么实现,待会看看它的实现原理。

    它的应用场景:

      一般在于在执行当前线程之前,要完成n个线程的任务,才能执行当前线程。这种场景适合用countdownLatch。

   二、源码解析

      先来看看该类的构造,如下图

      

      如图,红色框选中的是该类的一个内部类,该内部类实现了抽象类AQS,具体锁的获取和释放是由该内部类实现的。

     由上图知countdownLatch只有一个构造函数,    

1    public CountDownLatch(int count) {
2         if (count < 0) throw new IllegalArgumentException("count < 0");
3         this.sync = new Sync(count);
4     }

    很明显,它有一个参数,这个参数,被用在哪里呢,请看下面

1      Sync(int count) {
2             setState(count);
3         }

     这个参数最终用在了状态值上,由此可猜测,这个状态值决定这锁什么时候释放。

      1、内部类Sync

        

   private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);//设置状态值的大小
        }

        int getCount() {
            return getState();//获取状态值
        }
    //当状态值为0才返回1,否则返回-1,也用来判断线程是否拥有该锁,值大于0,不拥有,小于0,则拥有
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
     //对状态值进行操作,每一次成功,则状态值-1,     //也知道只有状态值为1,然后再执行该方法,才会返回true,否则其它情况全是返回false
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {//这个无限循环是为了保证在进行有其他线程也在操作状态值,导致失败之后就不操作了
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))//对状态值递减,若有其他线程也在操作,则可通过for的无限循环来保证一定能递减成功
                    return nextc == 0;
            }
        }
    }

  该类重写了AQS的tryAcquireShared(int)和tryReleaseShared(int)两个方法, 

  下面来看看这个CountDownLatch类常用的方法

     2、await()方法

 1   public void await() throws InterruptedException {
 2         sync.acquireSharedInterruptibly(1);
 3     }
 4
 5
 6     public final void acquireSharedInterruptibly(int arg)
 7             throws InterruptedException {
 8         if (Thread.interrupted())
 9             throw new InterruptedException();
10         if (tryAcquireShared(arg) < 0)
11             doAcquireSharedInterruptibly(arg);
12     }

  在调用await方法时,再用sync去调用AQS的内部方法acquireSharedInterruptibly(因为sync类没重写该方法),会先判断当前线程是否被中断(中断一般是由外部条件引起的),若中断直接抛出异常,否则,获取通过tryAcquireShared方法来判断当前线程是否拥有该共享锁,当值小于0,则拥有,大于0,则不拥有,继续下一步,若有锁,则再执行doAcquireSharedInterruptibly方法,

  

 1   private void doAcquireSharedInterruptibly(int arg)
 2         throws InterruptedException {
 3         final Node node = addWaiter(Node.SHARED);//对当前线程进行一个包装,同时也初始化了等待队列,即head->node->...->tail
 4         boolean failed = true;
 5         try {
 6             for (;;) {
 7                 final Node p = node.predecessor();//获取该node节点的前一个节点,一般首次调用时,该前一个节点就是head节点。
 8                 if (p == head) {
 9                     int r = tryAcquireShared(arg);//再次获取锁的状态,
10                     if (r >= 0) {//若状态值为0,则进入
11                         setHeadAndPropagate(node, r);
12                         p.next = null; // help GC
13                         failed = false;
14                         return;
15                     }
16                 } //shouldParakAfterFailedAcquire方法主要是针对node节点的状态进行操作,若为signal,则挂起,若为0或PROPAGATE,则转换成signal,为cancelled,则放弃,寻找前一个不是该状态值的节点
17                 if (shouldParkAfterFailedAcquire(p, node) &&
18                     parkAndCheckInterrupt())//挂起线程
19                     throw new InterruptedException();
20             }
21         } finally {
22             if (failed)//若failed为true,一般是出现了异常,或者线程被中断
23                 cancelAcquire(node);
24         }
25     }

    从上述分析来看,只有当状态值为0的时候,才会调用setHeadAndPropagate(node,int)方法,否则会无限等待,当前线程也会被挂起,该方法源码如下

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        setHead(node);//将node节点设置为头结点,对比前面的doAcquireSharedInterruptibly方法,也就是头结点的下一个节点,且该节点的状态为shared
        //对propagate值,头结点和状态,进行判断
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;//获取node节点的下一个节点
         //对node节点的下一个节点进行判断,是否为null,和状态值是否为shared
            if (s == null || s.isShared())
          //该方法作用为了释放当前锁,即线程阻塞
                doReleaseShared();
        }
    }

    上面说的是执行await方法后,发生的一系列操作,也知道了只有当状态值为0,才会使线程通行,下面来看一看怎么使状态值为0的。

  3、countDown方法

    在调用tryReleaseShared方法,每调用一次,state值就会减一,但除了某个时刻当state值减一后恰好为0,才会返回true,否则返回false,为0时刻,也表明锁被其它线程给释放了。

 1 public void countDown() {
 2         sync.releaseShared(1);
 3     }
 4
 5 public final boolean releaseShared(int arg) {
 6      //尝试获取锁的状态
 7         if (tryReleaseShared(arg)) {
 8             doReleaseShared();//此时,状态值已经为0,执行doReleasseShared方法,
 9             return true;
10         }
11         return false;
12     }

    也许会有人有疑问,说,为什么在执行await方法后的一些类操作中,也执行了doReleaseShared方法,这岂不是要释放两次?

   其实不然,主要是怕doAcquireSharedInterruptibly方法执行后,由于某种原因,当前线程为挂起(即阻塞了),不再执行了,这时只有通过releaseShared方法来唤醒线程,下面看看doReleaseShared方法的实现

  

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//若头结点状态为signal,则进入,头结点初始化时的状态值为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//若交换失败,则终止此步操作,继续下一轮循环
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//这是释放锁的关键
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // 若head节点被改变了,则继续循环,否则,跳出循环
                break;
        }
    }

  unparkSuccessor(node)分析如下,该方法作用是为了释放node节点的后一个节点中的线程,在这里,node节点就是head节点

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;      //下一个节点为null或状态值为cancelled,
        if (s == null || s.waitStatus > 0) {
            s = null;        //由后往前搜索,节点状态值小于或等于0的节点(即状态值不是cancelled值),搜索到的结果一定是最靠近node节点的,且状态值<=0.        //至于为什么不从前往后搜索,原因不太清楚!!!
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//释放
    }

  三、总结 

    countdownLatch在初始化构造函数时,会先将参数设置为状态state值,之后执行await方法后,会进行这一系列的步骤
    1、将shared和当前线程包装成一个node节点,(在第一次调用还会初始化等待队列)在队列中,有这样的队列 head->node,其中node就是被包装成share的节点

    2、之后在doAcquireSharedInterruptibly方法中,执行了shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法, 若顺利,则head节点的状态值会变为signal,并且当前线程会通过执行park方法进行挂起。

      
    3、在方法tryReleaseShared中,会一直操作state值,使之减1,一直到state的值,减为0时,在这之前,当前线程一直会被阻塞。当为0时,会执行doReleaseShared方法 对当前线程执行unparkSuccessor方法,进行放行。



  以上就是我对countdownLatch类的理解,若有不足之处,还望指正!

原文地址:https://www.cnblogs.com/qm-article/p/8320177.html

时间: 2025-01-13 14:27:22

CountDownLatch源码解析的相关文章

死磕 java同步系列之CountDownLatch源码解析

??欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作. Cou

Java并发编程之CountDownLatch源码解析

一.导语 最近在学习并发编程原理,所以准备整理一下自己学到的知识,先写一篇CountDownLatch的源码分析,之后希望可以慢慢写完整个并发编程. 二.什么是CountDownLatch CountDownLatch是java的JUC并发包里的一个工具类,可以理解为一个倒计时器,主要是用来控制多个线程之间的通信.比如有一个主线程A,它要等待其他4个子线程执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了. 三.简单使用 public static void mai

死磕 java同步系列之StampedLock源码解析

问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWriteLock的对比? 简介 StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,而且它不是基于AQS来实现的,它的内部自成一片逻辑,让我们一起来学习吧. StampedLock具有三种模式:写模式.读模式.乐观读模式. ReentrantReadWriteLock中的读和写都是

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行.它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走. 使用方法 pu

死磕 java同步系列之Phaser源码解析

问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务. 这种场景虽然使用CyclicBarrier或者CountryDownLatch也可以实现,但是要复杂的多.首先,具体需要多少个阶段是可能会变的,其次,每个阶

AQS源码解析

文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多.还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油! AQS 结构 属性 private transient volatile Node head; private tra

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

eclipse中导入jdk源码、SpringMVC注解@RequestParam、SpringMVC文件上传源码解析、ajax上传excel文件

eclipse中导入jdk源码:http://blog.csdn.net/evolly/article/details/18403321, http://www.codingwhy.com/view/799.html. ------------------------------- SpringMVC注解@RequestParam:http://825635381.iteye.com/blog/2196911. --------------------------- SpringMVC文件上传源