JDK 源码解析 —— CyclicBarrier

一. 简介

CyclicBarrier 是一个让一系列线程集合互相等待直到一个公共屏障点(barrier point)的同步辅助工具。这个屏障被称为循环屏障,是因为它可以在等待线程释放后被重用。

CyclicBarrier 支持一个可选的 Runnable 命令,在最后一个线程到达后执行一次 Runnable 命令。

二. 简单使用示例

CyclicBarrier(3)
等到 3 个线程都到了,这个对象还可以重用,而 CountDownLatch 则不能重用,从 Cyclic 名字就可以看出这个类对象可以循环使用

public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final  CyclicBarrier cb = new CyclicBarrier(3);//创建CyclicBarrier对象并设置3个公共屏障点
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("线程" + Thread.currentThread().getName() +
                                "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");
                        cb.await();//到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行

                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("线程" + Thread.currentThread().getName() +
                                "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");
                        cb.await();
                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("线程" + Thread.currentThread().getName() +
                                "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");
                        cb.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

三. CyclicBarrier 作用图示

让所有线程都运行到同一个点(屏障点)后,再继续运行

四. 代码解析

  1. 重要变量
// 每次对栅栏的使用可以表现为一个 generation 实例。当条件 trip 改变或者重置 generation 也会
    // 随之改变。可以有多个 generation 和使用栅栏的线程关联,但是只有一个可以获得锁。
    private static class Generation {
        boolean broken = false;
    }

    /** 守护栅栏入口的锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 等待条件,直到所有线程到达栅栏 */
    private final Condition trip = lock.newCondition();
    /** 要屏障的线程数 */
    private final int parties;
    /* 当线程都到达栅栏,运行的 Runnable */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    //还要等待多少个线程到达。线程到达屏障点就减去 1。
    //每次新建 generation 的时候或者屏障 broken,count重新设置为 parties 参数值
    private int count;

  1. await() 方法:等待到所有参与的线程都到达屏障点。如果当前线程不是最后一个到达的,当前线程停止运行,进入睡眠,直到以下几种情况发生
  • 最后的线程到达
  • 其他线程中断当前线程
  • 其他线程中断中断等待线程中的一条
  • 在等待所有线程到达屏障前有线程超时
  • 其他线程在此屏障中调用 reset(将屏障设置为初始状态)

如果当前线程:

  • 设置了中断状态
  • 在等待时中断

那么,就会抛出 InterruptedException,并且当前线程中断状态被清除。

如果在任何线程等待过程中屏障被重置(即调用 reset() 方法),那么所有的线程都会抛出 BrokenBarrierException,并且这个屏障置于 broken 状态。

如果当前线程是最后一个到达屏障的线程,并且屏障的构造器传入了 Runnable 参数,那么在其他线程执行前,先执行 Runnable。如果在屏障运行中发生了异常,那么异常会在当前线程中被传播,屏障将被置于 broken 状态。

返回值:返回当前线程到达的下标

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();// 加了锁,以下操作为线程安全操作
    try {
        final Generation g = generation;

        if (g.broken)  // 如果屏障状态 broken,则抛出屏障 broken 异常
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       int index = --count;
       if (index == 0) {  // tripped 说明是最后一个到达的线程
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null) // 如果有 Runnable,先执行
                   command.run();
               ranAction = true;
               nextGeneration();// 唤醒 Condition 队列的所有线程,既然是 Cyclic 的,所以也会重置状态以便重用屏障,这是和 CountDownLatch 的区别
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {// 如果不是最后一个到达的线程,就进入循环等待
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

五. 总结

CyclicBarrier 是利用了 Condition 接口,定义了一个叫做 trip 的 Condition,当所有线程到达后线程才能从 Condition 队列中移到 AQS 的等待队列继续运行。关于 Condition,可以参考博主的另一篇博文:http://blog.csdn.net/wenniuwuren/article/details/51447767

六. 参考资料

JDK 7 源码

时间: 2024-08-06 01:50:17

JDK 源码解析 —— CyclicBarrier的相关文章

设计模式-简单工厂Coding+jdk源码解析

前面的软件设计七大原则,目前只有理论这块,因为最近参与项目重构,暂时没有时间把Coding的代码按照设计思路一点点写出来.本周周末会花时间整理出来,然后现在想的是白天更新一点并发编程,晚上回家学习设计模式.非科班出身,脑子也比较笨.博文都是自己根据学习的时候所想的思路,如果能有帮到各位的地方,那十分荣幸.如果有欠缺之处,希望能在评论中指出一起进步.好啦,开始正文了. 本套设计模式的博文,包含各种设计模式的定义.类型.适用场景及优缺点分析.并通过Coding去实际加深理论理解. 简单工厂: 该模式

JDK 源码解析 —— ConcurrentHashMap

零. 概述 ConcurrentHashMap 是将锁的范围细化来实现高效并发的. 基本策略是将数据结构分为一个一个 Segment(每一个都是一个并发可读的 hash table, 即分段锁)作为一个并发单元. 为了减少开销, 除了一处 Segment 是在构造器初始化的, 其他都延迟初始化(详见 ensureSegment). 并使用 volatile 关键字来保证 Segment 延迟初始化的可见性问题. HashMap 不是线程安全的, 故多线程情况下会出现 infinit loop.

JDK 源码解析 —— HashSet

零. 简介 这个类实现了 Set 接口,内部是由 HashMap 实例辅助实现的.它不保证元素的顺序,数据允许为 null. 假如 hash 方法将数据分散地比较合理,比如一个 bucket 一个数据,那么 add.remove.contains.size 性能开销是常数时间. 这个类非线程安全的,如果多线程并发访问,并且至少一个线程在做修改操作,那么必须在外部做好同步处理.例如使用:Set s = Collections.synchronizedSet(new HashSet(...)); 一

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec

JDK 源码解析 —— Integer

零. 简介 对于 Integer 这个 Java 程序员几乎天天使用的类, 使用上却可以看出普通程序员和优秀程序员区别. 一. 深入代码 在创建数字 1 的对象时, 大多数人会使用 new Integer(1), 而使用 Integer.valueOf(1) 可以使用系统缓存,既减少可能的内存占用,也省去了频繁创建对象的开销. 系统默认只缓存 -128-127 之间的整数.下面我们看一下 Integer.valueOf(int) 方法的代码: public static Integer valu

JDK 源码解析 —— Semaphore

零. 简介 这是一个用来对并发计数的信号量,并发量超过一定数值则只能等待.从概念上来说,semaphore 维持着一组许可证.获取锁的时候,需要先获得 semaphore 的许可才行. 一. 从 Demo 解析源码 package com.wenniuwuren.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concu

【JDK】JDK源码分析-CyclicBarrier

概述 CyclicBarrier 是并发包中的一个工具类,它的典型应用场景为:几个线程执行完任务后,执行另一个线程(回调函数,可选),然后继续下一轮,如此往复. 打个通俗的比方,可以把 CyclicBarrier 的执行流程比作:几个人(类比线程)围着操场跑圈,所有人都到达终点后(终点可理解为“屏障(barrier)”,到达次序可能有先后,对应线程执行任务有快慢),执行某个操作(回调函数),然后再继续跑下一圈(下一次循环),如此往复. 该类与 CountDownLatch 相比,可以把后者理解为

Integer.parseInt不同jdk源码解析

执行以下代码: System.out.println(Integer.parseInt("-123")); System.out.println(Integer.parseInt("+123")); 以下仅提供1.6和1.7两个版本的比较  1.6版本执行结果为:    1.7版本执行结果为: 从两方面去查证结果的原因,分别是:查看API文档 和 查看对应的源代码 [查看API文档]  1.6版本对应的API文档:    1.7版本对应的API文档: 可以看出,对第

【jdk源码分析】ArrayList的size()==0和isEmpty()

先看结果 分析源码 [jdk源码解析]jdk8的ArrayList初始化长度为0 java的基本数据类型默认值 无参构造 size()方法 isEmpty()方法 原文地址:https://www.cnblogs.com/xiaostudy/p/10781148.html