多线程编程-- part 8 CyclicBarrier

CyclicBarrier简介

  cuclicBarrier允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

CyclicBarrier函数列表

CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。

CyclicBarrier数据结构

可见其包含的对象:

(1)parties:定义多少个等待线程可以启动屏障

(2)count:处于等待状态的线程数量

(3)lock:是ReentrantLock独占锁

(4)trip:condition条件控制,控制线程的等待和激活

(5)barrierCommand:表示当parties个处于等待状态的线程到达屏障时,要执行的动作

(6)generation:记录线程属于那一代,当有parties个线程到达barrier时,generation会被换代

CyclicBarrier核心函数

1.构造函数

(1)CyclicBarrier(int parties), CyclicBarrier(int parties, Runnable barrierAction)

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必须同时到达barrier的线程个数”。
    this.parties = parties;
    // count表示“处在等待状态的线程个数”。
    this.count = parties;
    // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。
    this.barrierCommand = barrierAction;
}

(2)等待函数

  

CyclicBarrier.java中await()方法如下:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}

说明:await()是通过dowait()实现的。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 获取“独占锁(lock)”
    lock.lock();
    try {
        // 保存“当前的generation”
        final Generation g = generation;

        // 若“当前generation已损坏”,则抛出异常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 将“count计数器”-1
       int index = --count;
       // 如果index=0,则意味着“有parties个线程到达barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不为null,则执行该动作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 唤醒所有等待线程,并更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
        // 当前线程才继续执行。
        for (;;) {
            try {
                // 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待过程中,线程被中断,则执行下面的函数。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“当前generation已经损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已经换代”,则返回index。
            if (g != generation)
                return index;

            // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放“独占锁(lock)”
        lock.unlock();
    }
}

说明:dowait()的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation是CyclicBarrier的一个成员遍历,它的定义如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程到达barrier,generation就会被更新换代。

(02) 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。breakBarrier()的源码如下:

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。

(03) 将“count计数器”-1,即--count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。

(04) 在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待

CyclicBarrier示例

(1)新建5个线程,都调用await()等待,这些线程都到达屏障,继续往后执行

public class testHello {

    private static int SIZE = 5;
    private static CyclicBarrier cb;
    public static void main(String[] args) {

        cb = new CyclicBarrier(SIZE);

        // 新建5个任务
        for(int i=0; i<SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                // 将cb的参与者数量加1
                cb.await();

                // cb的参与者数量等于5时,才继续往后执行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

(2)新建5个线程,都调用await()等待,这些线程都到达屏障,执行runnable中定义的任务

public class testHello {

    private static int SIZE = 5;
    private static CyclicBarrier cb;
    public static void main(String[] args) {

        cb = new CyclicBarrier(SIZE, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " runnable start");
            }
        });

        // 新建5个任务
        for(int i=0; i<SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                // 将cb的参与者数量加1
                cb.await();

                // cb的参与者数量等于5时,才继续往后执行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  可以看出当parties个线程到达屏障时,会执行屏障的Runable定义的任务

时间: 2024-10-29 03:49:24

多线程编程-- part 8 CyclicBarrier的相关文章

JAVA读书推荐----《深入分析Java Web技术内幕》--《java多线程编程核心技术》--《大型网站技术架构 核心原理与案例分析》-《Effective Java中文版》

(1)  首先推荐的不是一本书,而是一个博客,也是我们博客园另外一位博友java_my_life. 目前市面上讲解设计模式的书很多,虽然我前面讲了看书是最好的,但是对设计模式感兴趣的朋友们,我推荐的是这个博客.这位博友的设计模式讲得非常非常好,我认为90%的内容都是没有问题且很值得学习的,其讲解设计模式的大体路线是: 1.随便开篇点明该设计模式的定义 2.图文并茂讲解该设计模式中的结构 3.以详细的代码形式写一下该种设计模式的实现 4.补充内容 5.讲解该设计模式的优缺点 对于一个设计模式我们关

汪大神Java多线程编程实战

课程目录:├─1│  ├─Java并发编程.png│  ├─源码+ppt.rar│  ├─高并发编程第一阶段01讲.课程大纲及主要内容介绍.wmv│  ├─高并发编程第一阶段02讲.简单介绍什么是线程.wmv│  ├─高并发编程第一阶段03讲.创建并启动线程.mp4│  ├─高并发编程第一阶段04讲.线程生命周期以及start方法源码剖析.mp4│  ├─高并发编程第一阶段05讲.采用多线程方式模拟银行排队叫号.mp4│  ├─高并发编程第一阶段06讲.用Runnable接口将线程的逻辑执行单元

多线程编程核心技术总结(读周志明书籍的总结)

多线程编程核心技术总结 1.Java多线程基本技能 1.1进程和线程的概念: 进程是独立的程序,线程是在进程中独立运行的子任务. 1.2使用多线程 1.2.1实现方法:继承Thread类,重写Runnable接口. 1.2.2线程安全问题:并发修改公共的实例变量,i++,i-- 1.3线程Thread类的一些方法: currentThread() 放回代码段正在被那个线程调用 isAlive() 判断线程是否处于活动状态 sleep() 使得当前线程退出CPU片段,等待获取锁 1.4停止线程 1

Java基础知识—多线程编程(五)

概述 Java 给多线程编程提供了内置的支持.一个多线程程序包含两个或多个能并发运行的部分.程序的每一部分都称作一个线程,并且每个线程定义了一个独立的执行路径.使用多线程也是为了充分的利用服务器资源,提高工作效率. 线程生命周期 线程是一个动态执行的过程,它也有一个从产生到死亡的过程. 新建状态: 使用 new 关键字和 Thread 类或其子类建立一个线程对象后,该线程对象就处于新建状态.它保持这个状态直到程序 start() 这个线程. 就绪状态: 当线程对象调用了start()方法之后,该

第73课 Qt中的多线程编程

1. QThread类 (1)QThread是一个跨平台的多线程解决方案 (2)QThread以简洁易用的方式实现多线程编程 2. QThread中的关键成员函数 (1)virtual void run() :线程函数,用于定义线程功能(执行流). (2)void start():启动函数,将线程入口地址设置为run函数.启动线程,新线程开始执行run函数. (3)int exec():进入事件循环,直至调用exit().返回线程退出事件循环的返回码. (4)void terminate():强

多线程编程(进程和线程)

多线程编程(进程和线程) 1.进程:指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程可以启动多个线程. 2.线程:指程序中一个执行流程,一个进程中可以运行多个线程. 一.创建线程(两种方式) 二.线程的5种状态( New,Runnable,Running,Block,Dead ): 三.线程的优先级 四.守护线程 /精灵线程/后台线程 五.方法 六.同步代码锁(synchronized) 一.创建线程(两种方式): 方式1:采用继承Thread的方法 第一,继承 Thre

多线程编程基础知识

多线程编程基础知识 http://www.cnblogs.com/cy163/archive/2006/11/02/547428.html 当前流行的Windows操作系统能同时运行几个程序(独立运行的程序又称之为进程),对于同一个程序,它又可以分成若干个独立的执行流,我们称之为线程,线程提供了多任务处理的能力.用进程和线程的观点来研究软件是当今普遍采用的方法,进程和线程的概念的出现,对提高软件的并行性有着重要的意义.现在的大型应用软件无一不是多线程多任务处理,单线程的软件是不可想象的.因此掌握

iOS多线程编程

1. 进程,线程, 任务 进程:一个程序在运行时,系统会为其分配一个进程,用以管理他的一些资源. 线程:进程内所包含的一个或多个执行单元称为线程,线程一般情况下不持有资源,但可以使用其所在进程的资源. 任务:进程或线程中要做的事情. 在引入线程的操作系统中,通常把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位. 线程比进程更小,对其调度的开销小,能够提高系统内多个任务的并发执行程度. 一个程序至少有一个进程,一个进程至少有一个线程.一个程序就是一个进程,而一个程序中的多个任

多线程编程1-NSThread

前言 每个iOS应用程序都有个专门用来更新显示UI界面.处理用户触摸事件的主线程,因此不能将其他太耗时的操作放在主线程中执行,不然会造成主线程堵塞(出现卡机现象),带来极坏的用户体验.一般的解决方案就是将那些耗时的操作放到另外一个线程中去执行,多线程编程是防止主线程堵塞,增加运行效率的最佳方法. iOS中有3种常见的多线程编程方法: 1.NSThread 这种方法需要管理线程的生命周期.同步.加锁问题,会导致一定的性能开销 2.NSOperation和NSOperationQueue 是基于OC