Java CyclicBarrier

概述



1.CyclicBarrier介绍

2.CyclicBarrier源码分析

3.CyclicBarrier示例

CyclicBarrier介绍

CyclicBarrier翻译过来也叫栅栏,意思很明显,就是一组线程相互等待,均到达栅栏的时候,再运行。CyclicBarrier是可以重复使用的,而之前的CountDownLatch是一次性的。CyclicBarrier允许一组线程相互等待,直到到达某个公共屏障点,屏障点即一组任务执行完毕的时候。

与CountDownLatch的区别:

1.CyclicBarrier是可重复使用的,CountDownLatch是一次性的。

2.使用场景,CountDownLatch用于不同线程之间的等待,比如主线程需要等待子线程执行完毕。而CyclicBarrier用于一组线程内的相互等待,比如5个线程到底某种状态才执行。

CyclicBarrier的应用场景,举个生活相关的例子。 比如跑步比赛,所有人必须到底起跑点才能开始比赛,比赛开始后,所有人各自奔跑,需要等待所有人到底终点后才能结束比赛。

CyclicBarrier是包含了"ReentrantLock"和"Condition对象trip",它是通过独占锁实现的。

/** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();

CyclicBarrier源码分析

首先使用的时候,我们会初始化一个CyclicBarrier对象

 public CyclicBarrier(int parties, Runnable barrierAction) {  //这里运行添加一个Runnable任务,用于到底屏障点执行
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {  //parties代表在栅栏放开之前,parties个数量的线程必须被调用
        this(parties, null);
    }

这时每个线程调用的时候,会调用await()方法

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L); //第一个参数代表是否有超时时间,第二个参数代表超时时间长
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;   //调用Lock,CyclicBarrier并不能实现同步,所有需要通过lock来保证线程安全
        lock.lock();
        try {
            final Generation g = generation;  //当前的generation,generation的数据结构见下面

            if (g.broken)  //如果已损坏,则抛出异常
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {  //如果线程被中断
                breakBarrier();  //方法说明见下面,因为一旦线程被中断了,就没办法实现屏障点
                throw new InterruptedException();
            }

            int index = --count;  //将计算器parties减1
            if (index == 0) {  // tripped  //如果到底屏障点
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;  //获取要执行的Runnable任务
                    if (command != null)
                        command.run();  //如果不为空,则执行任务
                    ranAction = true;
                    nextGeneration(); //更新generation及重置屏障点,唤醒线程
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();  //重置屏障点,唤醒线程
                }
            }

            // loop until tripped, broken, interrupted, or timed out,未到底屏障点
            for (;;) {
                try {
                    if (!timed)  //如果没有设置超时时间,则使用condition将线程等待,为什么使用condition?  因为所有的await都是等待一个条件,即parties变为0
                        trip.await();
                    else if (nanos > 0L)  //否则设置超时时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {  //异常处理
                    if (g == generation && ! g.broken) {  //如果generation没有换代,且没有broken,则调用breakBarrier终止CyclicBarrier
                        breakBarrier();
                        throw ie;
                    } else {  //否则中断当前线程
                        // We‘re about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)  //当前generation 已经损坏
                    throw new BrokenBarrierException();

                if (g != generation)  //generation已经换代,返回index数
                    return index;

                if (timed && nanos <= 0L) { //等待超时
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    private static class Generation {  //同一批线程属于同一个generationboolean broken = false;
    }

CyclicBarrier的特点就是要么大家都执行完毕,要么大家都被异常中断,不会其中一个有中断而其他正常执行完毕的现象存在。有点像原子的概念,当然不是一回事。所以这样的需要有一个状态来描述曾经是否有线程被中断过,这样后面执行的线程就知道是否需要继续等待了。Generation就是为了完成这个事的。而多个线程同时竞争Generation,竞争CyclicBarrier的index,这样就需要通过lock来保证安全。
private void breakBarrier() {
        generation.broken = true;  //将generation职位True,代表generation已经损坏
        count = parties;  //重启count数为parties
        trip.signalAll();  //这里使用的condition的singalAll唤醒所有线程
    }
private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();  //产生一个新的generation
    }

CyclicBarrier示例

场景说明,5个跑步者,等待所有人到齐后开始跑步,有跑的快的,有跑的慢的,这里用sleep来替代。 跑步完成后,一块去喝酒,喝酒必须等待所有人到齐后再开始喝,也是有人喝的快,有人喝的慢。 喝完酒后每个人say good bye。 所有人均sayGoodBye后,然后宣布散场。

这里使用了CyclicBarrier和CountDownLatch,从使用里面可以看出二者的主要区别。 CyclicBarrier主要用于5个人之间相互等待。 而CountDownLatch用于5个人与主线程的等待。

public class CyclicBarrierTest1 {

    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);  //定义一个CyclicBarrier,可以重复使用
    private static CountDownLatch countDownLatch=new CountDownLatch(5);  //定义一个CountDownLatch,用于最后每个人say bye后打印,bye bye。

    public static void main(String[] args){

        for(int i=1;i<=5;i++){

            new RunningMan("name"+i,Long.valueOf(i*1000),cyclicBarrier,countDownLatch).start();
        }
        try {
            countDownLatch.await();  //主线程等待最后bye bye
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ByeBye");

    }
}

class RunningMan extends Thread{

    String name;
    long speed;
    CyclicBarrier cyclicBarrier;
    CountDownLatch countDownLatch;
    public RunningMan(String name,long speed,CyclicBarrier cyclicBarrier,CountDownLatch countDownLatch){

        this.name=name;
        this.speed=speed;
        this.cyclicBarrier=cyclicBarrier;
        this.countDownLatch=countDownLatch;
    }
    public void running(){  //奔跑

        System.out.println("Waiting "+name);
        try {
            int index=cyclicBarrier.await(); //所有人开始等待人到期,最后一个人到期后就开始跑了

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Running "+speed);
        try {
            Thread.sleep(speed);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void celebrate(){  //庆祝一下

        System.out.println("Celebrate man comming "+name);
        try {
            int index=cyclicBarrier.await();  //等待人到场后开始庆祝喝酒
//            System.out.println("index is "+index);

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Drinking "+speed);
        try {
            Thread.sleep(speed);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void sayGoodBy(){  //say good bye

        try {
            System.out.println(name+":say good bye");

            int index=cyclicBarrier.await();  //等待每个人say good bye
            countDownLatch.countDown();  //CountDownLatch减1

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

    }

    public void run(){

        running();
        celebrate();
        sayGoodBy();
    }
}

输出结果为:

Waiting name1
Waiting name3
Waiting name4
Waiting name2
Waiting name5
Running 5000
Running 1000
Running 3000
Running 4000
Running 2000
Celebrate man comming name1
Celebrate man comming name2
Celebrate man comming name3
Celebrate man comming name4
Celebrate man comming name5
Drinking 5000
Drinking 2000
Drinking 4000
Drinking 1000
Drinking 3000
name1:say good bye
name2:say good bye
name3:say good bye
name4:say good bye
name5:say good bye
ByeBye

时间: 2024-11-06 11:22:05

Java CyclicBarrier的相关文章

java CyclicBarrier的介绍和使用

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier. 举例说明:银行要进行账目录入,以防一个录入出现录入错误,采用两人对同一账目同时录入,以达到校对的效果 伪代码如下: import java.util.concurrent.BrokenBarrier

java CyclicBarrier同步屏障

CyclicBarrier的字面意思是可循环使用的屏障,它的主要作用是,让一组线程到达一个屏障时被阻塞,知道最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行. 1.简介: CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其中参数标识屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞. package com.test; import java.util.concur

java CyclicBarrier的使用

api对CyclicBarrier的描述: 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier.  也就是说他可以使一组线程先等待 然后达到某个条件之后再一起执行,有点map/reduce的感觉. 举个例子: 目前有个int,  分配3个任务线程对他加

Java CyclicBarrier 浅谈

CyclicBarrier 意思是栅栏, 可以让多组线程到达某个点后开始等待, 等指定的线程数量都到达后再恢复线程, 这个CyclicBarrier是可以循环使用的. 又称为循环栅栏. 栗子: public class BarrierDemo { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); final CyclicBarrier barrie

Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例

概要 本章介绍JUC包中的CyclicBarrier锁.内容包括:CyclicBarrier简介CyclicBarrier数据结构CyclicBarrier源码分析(基于JDK1.7.0_40)CyclicBarrier示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533995.html CyclicBarrier简介 CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier p

java路障CyclicBarrier

当所有线程都执行到某行代码,才可已往下执行: package threadLock; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; pub

深入浅出 Java Concurrency (11): 锁机制 part 6 CyclicBarrier[转]

如果说CountDownLatch是一次性的,那么CyclicBarrier正好可以循环使用.它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).所谓屏障点就是一组任务执行完毕的时刻. 清单1 一个使用CyclicBarrier的例子 package xylz.study.concurrency.lock; import java.util.concurrent.BrokenBarrierException;import java.util.concur

Java学习笔记--并发工具Semaphore,CountDownLatch,CyclicBarrier,Exchanger

Semaphore 实现典型的信号量 CountDownLatch 在指定数量的事件发生前一直等待 CyclicBarrier 使一组线程在一个预定义的执行点等待 Exchanger 交换两个线程的数据 1. Semaphore 信号量(Semaphore),是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源 在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能

java中CyclicBarrier简单入门使用

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier. CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次.若在继续所有参与线程之前更新共享状态,此