Java多线程同步工具类之CyclicBarrier

一、CyclicBarrier使用

CyclicBarrier从字面上可以直接理解为线程运行的屏障,它可以让一组线程执行到一个共同的屏障点时被阻塞,直到最后一个线程执行到指定位置,你设置的执行线程就会触发运行;同时CyclicBarrier相比与CountDownLatch,它是可以被重置的;下面我们通过一个简单例子看下CyclicBarrier的使用;

实例化一个CyclicBarrier对象并传入你要控制的线程内部;

    public static void main(String[] args) {

        CyclicBarrier cb = new CyclicBarrier(3, new Runnable() {
            public void run() {
                System.out.println("所有线程集合");
            }
        });
        for (int i = 0; i < 3; i++) {
            new CyclicBarrierThread(i + "", cb).start();
        }

    }

计数线程代码,每当计数到偶数时调用CyclicBarrier的await()方法

public class CyclicBarrierThread extends Thread{

    private CyclicBarrier barrier;

    private String name;

    private int count;

    public CyclicBarrierThread(String name,CyclicBarrier barrier) {
        this.name=name;
        this.barrier=barrier;
        this.count=0;
    }

    public void run() {
        try {
            for(int i=0;i<10;i++) {

                Thread.sleep(100);
                count++;
                System.out.println(name+"号线程---"+Thread.currentThread().getName()+"开始计数:"+count);
                if(count%2==0) {//每计数到偶数次时集合一次
                    barrier.await();
                    System.out.println(name+"号线程---"+Thread.currentThread().getName()+"集合完毕,继续计数");
                }
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

查看代码输出

2号线程---Thread-2开始计数:1
0号线程---Thread-0开始计数:1
1号线程---Thread-1开始计数:1
2号线程---Thread-2开始计数:2
1号线程---Thread-1开始计数:2
0号线程---Thread-0开始计数:2
所有线程集合
2号线程---Thread-2集合完毕,继续计数
1号线程---Thread-1集合完毕,继续计数
0号线程---Thread-0集合完毕,继续计数
2号线程---Thread-2开始计数:3
1号线程---Thread-1开始计数:3
0号线程---Thread-0开始计数:3
2号线程---Thread-2开始计数:4
0号线程---Thread-0开始计数:4
1号线程---Thread-1开始计数:4
所有线程集合
1号线程---Thread-1集合完毕,继续计数
2号线程---Thread-2集合完毕,继续计数
0号线程---Thread-0集合完毕,继续计数
0号线程---Thread-0开始计数:5
2号线程---Thread-2开始计数:5
1号线程---Thread-1开始计数:5
0号线程---Thread-0开始计数:6
1号线程---Thread-1开始计数:6
2号线程---Thread-2开始计数:6
所有线程集合
2号线程---Thread-2集合完毕,继续计数
0号线程---Thread-0集合完毕,继续计数
1号线程---Thread-1集合完毕,继续计数
0号线程---Thread-0开始计数:7
1号线程---Thread-1开始计数:7
2号线程---Thread-2开始计数:7
1号线程---Thread-1开始计数:8
0号线程---Thread-0开始计数:8
2号线程---Thread-2开始计数:8
所有线程集合
2号线程---Thread-2集合完毕,继续计数
0号线程---Thread-0集合完毕,继续计数
1号线程---Thread-1集合完毕,继续计数
0号线程---Thread-0开始计数:9
1号线程---Thread-1开始计数:9
2号线程---Thread-2开始计数:9
1号线程---Thread-1开始计数:10
0号线程---Thread-0开始计数:10
2号线程---Thread-2开始计数:10
所有线程集合
1号线程---Thread-1集合完毕,继续计数
2号线程---Thread-2集合完毕,继续计数
0号线程---Thread-0集合完毕,继续计数

通过输出结果可以看到,计数线程每计数到偶数次时使用CyclicBarrier的await()方法,线程都会进入阻塞等待的状态,直到最后一个线程到达屏障点时,触发你定义的执行线程,而且CyclicBarrier的await()方法是可以重复使用的。

二、CyclicBarrier源码分析

下面我们就对CyclicBarrier内部的源码实现进行一些分析与总结

1、CyclicBarrier的构造

首先看下CyclicBarrier的构造函数

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        //拦截的线程数量
        this.parties = parties;
        //用于计数的count值,每有一个线程执行到屏障点,就会递减1
        this.count = parties;
        //定义的拦截线程
        this.barrierCommand = barrierAction;
    }

CyclicBarrier的构造函数很简单就是接收你要拦截的线程数量与定义的执行线程。

2、await方法

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

我们看下具体实现dowait方法的实现

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        //获取可重入锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //CyclicBarrier内部定义的一个Generation类
            final Generation g = generation;

            //判断Generation的broken状态
            if (g.broken)
                throw new BrokenBarrierException();

            //如果线程被中断
            if (Thread.interrupted()) {
                //Generation的broken置为true,count值重置,并唤醒所有线程
                breakBarrier();
                throw new InterruptedException();
            }

            //count值减一
            int index = --count;
            if (index == 0) {  // 如果conunt为0,说明最后一个线程到大屏障
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//执行你传入的线程
                    ranAction = true;
                    nextGeneration();//唤醒所有阻塞的线程,同时重置count值与Generation
                    return 0;
                } finally {
                    if (!ranAction)
                        //拦截线程没有正常执行,唤醒所有线程,同时重置count值,Generation的broken置为true
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    //是否设置阻塞的超时时间
                    if (!timed)
                        //释放当前锁
                        trip.await();//false 表示不设置,一直阻塞
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//true 设置阻塞的超时时间
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We‘re about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }

dowait方法的实现流程是很清晰的,通过ReentrantLock的Condition接口与count值相互配合,主要完成以下功能:

1、当需要拦截的线程到达屏障点调用await方法后获取ReentrantLock锁,保证线程安全;

2、检查count值是否为0,判断是否是最后一个线程到达屏障,如果是的话执行需要触发执行的线程,调用Condition的signalAll方法唤醒所有阻塞的线程,并重置count值与Generation类,保障CyclicBarrier的重复可用;

3、如果不是最后一个线程的话,根据传入的参数调用Condition的await方法释放锁资源并进入阻塞等待,直到被唤醒;

3、reset方法

可以用来主动重置CyclicBarrier的状态

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //generation.broken设置为true,唤醒所有线程,count值重置
            breakBarrier();
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

breakBarrier()与nextGeneration(),这两个方法的主要区别就在于前者会把generation.broken设置为true,也就是说如果调用reset方法主动重置CyclicBarrier类的状态,当前正在使用CyclicBarrier类同步的线程都会被唤醒或抛出异常;

4、getNumberWaiting方法

    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

很明显getNumberWaiting方法使用来获取当前已经运行至屏蔽点并阻塞等待的线程数量的;

三、总结

通过上面分析可以看到CyclicBarrier的实现原理相对还是比较简单与清晰的,主要是基于ReentrantLock与计数器相结合来实现多个线程的同步控制的。以上就是对CyclicBarrier类的使用与内部实现进行的分析,其中如有不足与不正确的地方还望指出与海涵。

关注微信公众号,查看更多技术文章。

原文地址:https://www.cnblogs.com/dafanjoy/p/11110575.html

时间: 2024-08-24 06:15:58

Java多线程同步工具类之CyclicBarrier的相关文章

同步工具类 CountDownLatch 和 CyclicBarrier

在开发中,一些异步操作会明显加快执行速度带来更好的体验,但同时也增加了开发的复杂度,想了用好多线程,就必须从这些方面去了解 线程的 wait() notify() notifyall() 方法 线程异步返回 Future ThreadLocal 类 线程池 ThreadPoolExecutor 同步工具类 CountDownLatch,CyclicBarrier,Semaphore,Phaser,Exchanger 估计上面每一个对于 2~3 年的 java 同学来说都是恶梦,比较难以理解,本文

JAVA 并发编程-线程同步工具类(十二)

本文主要介绍一些java线程同步工具类,并不进行具体讲解,当有需要时,可以再去结合实例学习. 信号灯(Semaphore) 应用场景举例: 例如公司的打卡系统,如果有一个打卡机,那么一次就只能有一个人打卡,其余的人就被阻塞住,打卡完以后就可由下一个人打卡.如果有3个打卡机,那么一次就允许3个人或者少于三个人打卡,其余的人就得等待打卡机空闲下来才能继续打卡. 结果: 已进入1个线程,还可进入2个 已进入2个线程,还可进入1个 已进入3个线程,还可进入0个 空余出1个 已进入4个线程,还可进入0个

《java并发编程实战》读书笔记4--基础构建模块,java中的同步容器类&amp;并发容器类&amp;同步工具类,消费者模式

上一章说道委托是创建线程安全类的一个最有效策略,只需让现有的线程安全的类管理所有的状态即可.那么这章便说的是怎么利用java平台类库的并发基础构建模块呢? 5.1 同步容器类 包括Vector和Hashtable,此外还包括在JDK1.2中添加的一些功能相似的类,这些同步的封装器类由Collections.synchronizedXxx等工厂方法创建的.这些类实现线程安全的方式是:将他们的状态封装起来,并对每个共有方法都进行同步,使得每次只能有一个线程能访问容器的状态. 关于java中的Vect

java多线程--同步屏障CyclicBarrier的使用

CyclicBarrier的概念理解: CyclicBarrier的字面上的意思是可循环的屏障,是java并发包java.util.concurrent 里的一个同步工具类,在我下载的JDK1.6的中文文档里对他的解释是: 大体意思就是:让一组线程到达一个屏障,一个集合点时,被阻塞,直到所有的线程都到了这个集合点时,屏障才会打开,然后线程才能继续往下执行.举个简单的例子就是:旅游团带着一帮人参观景点,规定在下一个景点A处集合,于是导游就在景点A等着大家,导游就是这个集合点或者说屏障,直到所有的游

Java 并发编程(四)常用同步工具类

同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流.阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore).栅栏(Barrier)以及闭锁(Latch). 闭锁 首先我们来介绍闭锁. 闭锁作用相当于一扇门:在闭锁到达某一状态之前,这扇门一直是关闭的,所有的线程都会在这扇门前等待(阻塞).只有门打开后,所有的线程才会同时继续运行. 闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如: 1.确保某个计算在其所有资源都被初始化之后才

线程:CyclicBarrier同步工具类

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点.比如公司组织活动出去玩,需要在公司门口一起搭车去.每个人从家里出发去公司门口,到达的时间肯定先后不一样,所以公司的车要一直等待,等所有人到齐后,才开车出发.CyclicBarrier就类似这样的功能,当所有线程到达"屏蔽点"的时候,才往下走. 具体等待多少根线程到达,可以在构造方法里指定CyclicBarrier(int parties). 当你的parties设为3的时候,假设只有2根线程到达此处,那程序会一直在此等待.

Java并发(基础知识)——显示锁和同步工具类

显示锁                                                                                     Lock接口是Java 5.0新增的接口,该接口的定义如下: public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long

【JAVA并发】同步工具类

同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等: 使用同步工具类可以协调线程的控制流: 同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法: 下面将简单介绍以上同步工具类: 闭锁 可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行: 以CountDownLatch为例,内部包含一个计数器,一开始初始化为一

Java并发:线程间同步-条件队列和同步工具类

转载请注明出处: jiq?钦's technical Blog - 季义钦 线程之间的同步,除了互斥(前面介绍的互斥锁)之外,还存在协作关系,下面我们就介绍一下java线程间常见的一些协作方式. 一.内置条件队列 正如每个Java对象都可以作为一个内置锁,每个对象也可以作为一个条件队列,称为内置条件队列,Object.wait().notify()/notifyAll()构成了内置条件队列的API. 需要注意的是,调用任何对象X的内置条件队列的API都必须要先获得该对象X的内置锁. 1.API介