concurrent(四)同步屏障 CyclicBarrier & 源码分析

参考文档:
Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例:https://www.cnblogs.com/skywang12345/p/3533995.html
简介
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。基于ReentrantLock实现
举个栗子

/**
 * 简单模拟一下对战平台中玩家需要完全准备好了,才能进入游戏的场景。
 *
 * @author BFD_526
 *
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        test();
    }
    // 同步屏障
    static void test() {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CyclicBarrier barrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            service.execute(new Player("玩家" + i, barrier));
        }
        service.shutdown();
    }
    // 同步屏障重置
    static void test1() {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CyclicBarrier barrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            service.execute(new Player("玩家" + i, barrier));
        }
        for (int i = 5; i < 10; i++) {
            service.execute(new Player("玩家" + i, barrier));
        }
        service.shutdown();
    }
    // 在同步屏障结束后,启动优先线程
    static void test2() {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CyclicBarrier ba = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有玩家已就位");
            }
        });
        for (int i = 0; i < 5; i++) {
            service.execute(new Player("玩家" + i, ba));
        }
    }
}

class Player implements Runnable {
    private final String name;
    private final CyclicBarrier barrier;

    public Player(String name, CyclicBarrier barrier) {
        this.name = name;
        this.barrier = barrier;
    }

    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3)));
            System.out.println(name + "已准备,等待其他玩家准备...");
            barrier.await();
            TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3)));
            System.out.println(name + "已加入游戏");
        } catch (InterruptedException e) {
            System.out.println(name + "离开游戏");
        } catch (BrokenBarrierException e) {
            System.out.println(name + "离开游戏");
        }
    }
}

源码分析

函数列表

CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作
CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
int await():在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
int await(long timeout, TimeUnit unit):在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间
int getNumberWaiting():返回当前在屏障处等待的参与者数目
int getParties():返回要求启动此 barrier 的参与者数目
boolean isBroken():查询此屏障是否处于损坏状态
void reset():将屏障重置为其初始状态

await()

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 获取“独占锁(lock)”
    lock.lock();
    try {
        // 保存“当前的generation”
        final Generation g = generation;
        // 若“当前generation已损坏”,则抛出异常。
        if (g.broken)
            throw new BrokenBarrierException();
        // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
       // 将“count计数器”-1
       int index = --count;
       // 如果index=0,则意味着“有parties个线程到达barrier”
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不为null,则执行该动作
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 唤醒所有等待线程,并更新generation
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }
        // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
        // 当前线程才继续执行。
        for (;;) {
            try {
                // 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待过程中,线程被中断,则执行下面的函数
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            // 如果“当前generation已经损坏”,则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果“generation已经换代”,则返回index
            if (g != generation)
                return index;
            // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放“独占锁(lock)”
        lock.unlock();
    }
}

generation是CyclicBarrier的一个成员变量,它的定义如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代
当有parties个线程到达barrier,generation就会被更新换代
换代:

//换代private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

原文地址:https://www.cnblogs.com/amei0/p/9021032.html

时间: 2024-10-27 05:26:55

concurrent(四)同步屏障 CyclicBarrier & 源码分析的相关文章

CyclicBarrier源码分析

概述 CyclicBarrier是一个同步辅助类,它允许一组线程相互等待,直到达到某个公共屏障点.并且在释放等待线程之后,CyclicBarrier是可以重复使用的. 简单使用 下面这段代码利用了CyclicBarrier来使得线程创建后相互等待,直到所有的线程都准备好,以此来使多个线程同时执行. public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrierTest cyclicBa

2020了你还不会Java8新特性?(四)Collector类源码分析

Collector类源码分析 jdk8是怎么对底层完成支持的.不了解底层,平时用还可以,但是遇到问题的时候就会卡在那里.迟迟灭有解决方案.在学习一门新技术时,先学习怎么去用,不要执着于源码.但是随着用的越来越多,你去了解底层是比较好的一种学习方法. 有多种方法可以实现同一个功能.什么方式更好呢? 越具体的方法越好. 减少自动装箱拆箱操作 collect : 收集器 Collector作为collect方法的参数. Collector作为一个接口.它是一个可变的汇聚操作,将输入元素累计到一个可变的

Apache Shiro学习笔记(四)身份验证源码分析

鲁春利的工作笔记,好记性不如烂笔头 本文出自 "闷葫芦的世界" 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1828405

【JUC】JDK1.8源码分析之CyclicBarrier(四)

一.前言 有了前面分析的基础,现在,接着分析CyclicBarrier源码,CyclicBarrier类在进行多线程编程时使用很多,比如,你希望创建一组任务,它们并行执行工作,然后在进行下一个步骤之前等待,直至所有的任务都完成,和join很类似,下面,开始分析源码. 二.CyclicBarrier数据结构 分析源码可以知道,CyclicBarrier底层是基于ReentrantLock和AbstractQueuedSynchronizer来实现的,所以,CyclicBarrier的数据结构也依托

Java显式锁学习总结之五:ReentrantReadWriteLock源码分析

概述 我们在介绍AbstractQueuedSynchronizer的时候介绍过,AQS支持独占式同步状态获取/释放.共享式同步状态获取/释放两种模式,对应的典型应用分别是ReentrantLock和Semaphore,AQS还可以混合两种模式使用,读写锁ReentrantReadWriteLock就是如此. 设想以下情景:我们在系统中有一个多线程访问的缓存,多个线程都可以对缓存进行读或写操作,但是读操作远远多于写操作,要求写操作要线程安全,且写操作执行完成要求对当前的所有读操作马上可见. 分析

ffplay源码分析6-音频重采样

ffplay是FFmpeg工程自带的简单播放器,使用FFmpeg提供的解码器和SDL库进行视频播放.本文基于FFmpeg工程4.1版本进行分析,其中ffplay源码清单如下: https://github.com/FFmpeg/FFmpeg/blob/n4.1/fftools/ffplay.c 在尝试分析源码前,可先阅读如下参考文章作为铺垫: [1]. 雷霄骅,视音频编解码技术零基础学习方法 [2]. 视频编解码基础概念 [3]. 色彩空间与像素格式 [4]. 音频参数解析 [5]. FFmpe

vscode源码分析【八】加载第一个画面

第一篇: vscode源码分析[一]从源码运行vscode 第二篇:vscode源码分析[二]程序的启动逻辑,第一个窗口是如何创建的 第三篇:vscode源码分析[三]程序的启动逻辑,性能问题的追踪 第四篇:vscode源码分析[四]程序启动的逻辑,最初创建的服务 第五篇:vscode源码分析[五]事件分发机制 第六篇:vscode源码分析[六]服务实例化和单例的实现 第七篇:vscode源码分析[七]主进程启动消息通信服务 先复习一下! 在第一节中,我们提到: app.ts(src\vs\co

vscode源码分析【九】窗口里的主要元素

第一篇: vscode源码分析[一]从源码运行vscode 第二篇:vscode源码分析[二]程序的启动逻辑,第一个窗口是如何创建的 第三篇:vscode源码分析[三]程序的启动逻辑,性能问题的追踪 第四篇:vscode源码分析[四]程序启动的逻辑,最初创建的服务 第五篇:vscode源码分析[五]事件分发机制 第六篇:vscode源码分析[六]服务实例化和单例的实现 第七篇:vscode源码分析[七]主进程启动消息通信服务 第八篇:vscode源码分析[八]加载第一个画面 在上一节中,我们讲到

【JDK源码分析】通过源码分析CyclicBarrier

前言 CyclicBarrier它是什么?一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点.类似于朋友之间联系要在中午聚个会,几个朋友全部到齐后才开始喝酒吃菜. 源码 CyclicBarrier属性和构造器 public class CyclicBarrier { // 互斥锁 private final ReentrantLock lock = new ReentrantLock(); // 条件等待 private final Condition trip = lock.new