并发工具类(二)同步屏障CyclicBarrier

前言

??JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;

??CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。

简介

??CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

构造方法摘要

方法名称 说明
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

方法摘要

方法名称 说明
public int await() throws
InterruptedException, BrokenBarrierException
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
返回:到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程.
public int await(long timeout,TimeUnit unit) throws
InterruptedException,BrokenBarrierException,
ITimeoutException
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
public void reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在由于其他原因造成损坏之后,实行重置可能会变得很复杂;
public boolean isBroken() 查询此屏障是否处于损坏状态。
public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
public int getParties() 返回要求启动此 barrier 的参与者数目。

注意:

  • 对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
  • 内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。

@ Example1 屏障操作的例子
public static void main(String[] args) {
    //设置5个屏障,并且有屏障操作
    CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
        @Override
        public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"执行了屏障操作");
        }
    });

    for(int i=0;i<5;i++){
       //创建5个线程
        Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);
        thread.start();
    }
}
class MyRunable implements Runnable{

    CyclicBarrier barrier;
    public MyRunable(CyclicBarrier barrier ){
         this.barrier = barrier;
    }

    @Override
    public void run() {
        //一系列操作...
         System.out.println("线程 "+Thread.currentThread().getName()+" 到达了屏障点!");
         try {
             int index = barrier.await();
            if(index== (barrier.getParties()-1)){
                //第一个到达屏障点的线程,执行特殊操作....
                System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是第一个到达屏障点");
            }else if(index == 0){//最后一个到达屏障点的线程
                System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是最后一个到达屏障点");
            }else{
                System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!");
            }
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

线程 thread_1 到达了屏障点!

线程 thread_4 到达了屏障点!

线程 thread_3 到达了屏障点!

线程 thread_0 到达了屏障点!

线程 thread_2 到达了屏障点!

线程thread_3执行了屏障操作

所有线程到达屏障点,线程 thread_3 被唤醒!!此线程是最后一个到达屏障点

所有线程到达屏障点,线程 thread_0 被唤醒!!

所有线程到达屏障点,线程 thread_4 被唤醒!!

所有线程到达屏障点,线程 thread_1 被唤醒!!此线程是第一个到达屏障点

所有线程到达屏障点,线程 thread_2 被唤醒!!

??上面的例子,使用了传入屏障操作的Runable参数的构造方法,屏障操作是由最后一个到达屏障点的线程执行的,这是不可以改变的。然而,在实际使用中,可能会出现由第n个到达屏障点的线程执行特殊的操作(或者说 屏障操作),那么就可以使用 CyclicBarrier.await()进行判断,如上面的例子,第一个和最后一个到达屏障点的线程都执行特殊的操作。

?? 顺便说一下,可能会对本例子中前5个输出的顺序 有所疑惑:thread_3 通过awiat()方法返回的索引值,可知 thread_3 是最后一个到达屏障点的,但为什么输出的顺序却是第三个,而不是最后一个;在这就要真正理解CyclicBarrier,CyclicBarrier 本质上是一把锁,多个线程在使用CyclicBarrier 对象时,是需要先获取锁,即需要互斥访问,所以调用await( )方法不一定能够马上获取锁。上面的例子,是先打印输出,再去获取锁,所以输出顺序不是到达屏障点的顺序。


@ Example2 应用场景

?? 下面的例子是:CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

public class BankWaterService implements Runnable {

    //创建4个屏障,处理完后执行当前类的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4,this);

    //假设只有4个sheet,所以只启动4个线程
    private Executor excutor = Executors.newFixedThreadPool(4);

    //保存每个sheet计算出的结果
    private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private void count(){
        for(int i=0;i<4;i++){
            excutor.execute(new Runnable() {

                @Override
                public void run() {
                    //计算过程.....
                    //存储计算结果
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        //计算完成,插入屏障
                        barrier.await();
                        //后续操作,将会使用到四个线程的运行结果....
                        System.out.println("线程"+Thread.currentThread().getName()+"运行结束,最终的计算结果:"+sheetBankWaterCount.get("result"));
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void run() {
        int result = 0;
        for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){
            result += item.getValue();
        }
        sheetBankWaterCount.put("result", result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

运行结果:

线程pool-1-thread-4运行结束,最终的计算结果:4

线程pool-1-thread-2运行结束,最终的计算结果:4

线程pool-1-thread-1运行结束,最终的计算结果:4

线程pool-1-thread-3运行结束,最终的计算结果:4


CyclicBarrier和CountDownLatch的区别

  • CountDownLatch: 一个线程(或者多个线程), 等待另外N个线程完成某个事情之后才能执行。而这N个线程通过调用CountDownLatch.countDown()方法 来告知“某件事件”完成,即计数减一。而一个线程(或者多个线程)则通过CountDownLatch.awiat( ) 进入等待状态,直到 CountDownLatch的计数为0时,才会全部被唤醒
  • CyclicBarrier : N个线程相互等待,任何一个线程完成某个事情之前,所有的线程都必须等待。

    CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.

    而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.

  • CountDownLatch只能使用一次,CyclicBarrier则可以通过reset( )方法重置后,重新使用。所以CyclicBarrier可以用于更复杂的业务场景。例如:计算错误,可以重置计数器,并让线程重新执行一次。


文献:

原文地址:https://www.cnblogs.com/jinggod/p/8494193.html

时间: 2024-10-11 16:24:15

并发工具类(二)同步屏障CyclicBarrier的相关文章

Java并发工具类之同步屏障CyclicBarrier

CyclicBarrier的字面意思是可以循环使用的Barrier,它要做的事情是让一个线程到达一个Barrier的时候被阻塞,直到最后一个线程到达Barrier,屏障才会放开,所有被Barrier拦截的线程才会继续运行. CyclicBarrier的默认的构造器是CyclicBarrier(int parties),参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞.示例代码如下: public class CyclicBa

并发工具类:CountDownLatch、CyclicBarrier、Semaphore

在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch 1 import java.util.concurrent.CountDownLatch; 2 3 4 public class CountDownLatchTest 5 { //设置N为2 6 static CountDownLatch c = new CountDownLatch(2); 7 p

Java并发工具类CyclicBarrier

CyclicBarrier同步屏障 java并发工具类中有一个叫做CyclicBarrier的类,与CountDownLatch类似,都可以实现线程间的同步,但是差别是CyclicBarrier是可重置的同步屏障. 想象一个场景,有N个人不同时间走到一扇门,因为门需要N个人合力才能推开,所以人不足N个时,只能阻塞在此,等到N个人都到了之后,可以推开门,继续进行之前的工作.CyclicBarrier就是这扇门. 看看下面的代码,定义了一个线程数为2的,CyclicBarrier,并在主线程和另外一

JDK并发工具类

在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段. 1等待多线程完成的CountDownLatch CountDownLatch:CountDownLatch允许一个或多个线程等待其他线程完成操作.与thread.join方法类似但功能更多.该计数器只能使用一次 CountDownLatch的构造函数接收一个int类型的参数作

Java学习笔记—多线程(并发工具类,java.util.concurrent.atomic包)

在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段.本章会配合一些应用场景来介绍如何使用这些工具类. CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作.假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,

并发工具类(四)线程间的交换数据 Exchanger

前言 ??JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch.CyclicBarrier.Semphore.Exchanger.Phaser: ??CountDownLatch.CyclicBarrier.Semphore.Phaser 这四个工具类提供一种并发流程的控制手段:而Exchanger工具类则提供了在线程之间交换数据的一种手段. 简介 ?? Exchanger的功能是使2个线程之间交换数据(有不少文章的说法是"传输数

25.大白话说java并发工具类-CountDownLatch,CyclicBarrier,Semaphore,Exchanger

1. 倒计时器CountDownLatch 在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能,在这种的业务场景下,通常可以使用Thread类的join方法,让主线程等待被join的线程执行完之后,主线程才能继续往下执行.当然,使用线程间消息通信机制也可以完成.其实,java并发工具类中为我们提供了类似"倒计时"这样的工具类,可以十分方便的完成所说的这种业务场景. 为了能够理解CountDownLatch,举一个很通俗的例子,运动员进行跑

第八章 Java中的并发工具类

等待多线程完成的CountDownLatch countDownLatch允许一个或多个线程等待其他线程完成操作. public class CountDownLatchTest { static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException{ new Thread(new Runnable() {

四、线程的并发工具类

线程的并发工具类 一.CountDownLatch [1]CountDownLatch是什么? CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行: 确保某个计算在其需要的所有资源都被初始化之后才继续执行; 确保某个服务在其依赖的所有其他服务都已经启动之后才启动; 等待直到某个操作所有参与者都准备就绪再继续执行