Java锁--CyclicBarrier

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533995.html

CyclicBarrier简介

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

注意比较CountDownLatchCyclicBarrier
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

CyclicBarrier函数列表

CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。

CyclicBarrier数据结构

CyclicBarrier的UML类图如下:

CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。

CyclicBarrier源码分析(基于JDK1.7.0_40)

CyclicBarrier完整源码(基于JDK1.7.0_40)

 

CyclicBarrier是通过ReentrantLock(独占锁)和Condition来实现的。下面,我们分析CyclicBarrier中3个核心函数: 构造函数, await()作出分析。

1. 构造函数

CyclicBarrier的构造函数共2个:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,下面第2个构造函数的源码。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必须同时到达barrier的线程个数”。
    this.parties = parties;
    // count表示“处在等待状态的线程个数”。
    this.count = parties;
    // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。
    this.barrierCommand = barrierAction;
}

2. 等待函数

CyclicBarrier.java中await()方法如下:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}

说明:await()是通过dowait()实现的。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 获取“独占锁(lock)”
    lock.lock();
    try {
        // 保存“当前的generation”
        final Generation g = generation;

        // 若“当前generation已损坏”,则抛出异常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 将“count计数器”-1
       int index = --count;
       // 如果index=0,则意味着“有parties个线程到达barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不为null,则执行该动作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 唤醒所有等待线程,并更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
        // 当前线程才继续执行。
        for (;;) {
            try {
                // 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待过程中,线程被中断,则执行下面的函数。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“当前generation已经损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已经换代”,则返回index。
            if (g != generation)
                return index;

            // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放“独占锁(lock)”
        lock.unlock();
    }
}

说明:dowait()的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation是CyclicBarrier的一个成员遍历,它的定义如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程到达barrier,generation就会被更新换代。

(02) 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。breakBarrier()的源码如下:

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。

(03) 将“count计数器”-1,即--count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。

(04) 在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

CyclicBarrier的使用示例

示例1
新建5个线程,这5个线程达到一定的条件时,它们才继续往后运行。

 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.BrokenBarrierException;
 3
 4 public class CyclicBarrierTest1 {
 5
 6     private static int SIZE = 5;
 7     private static CyclicBarrier cb;
 8     public static void main(String[] args) {
 9
10         cb = new CyclicBarrier(SIZE);
11
12         // 新建5个任务
13         for(int i=0; i<SIZE; i++)
14             new InnerThread().start();
15     }
16
17     static class InnerThread extends Thread{
18         public void run() {
19             try {
20                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21
22                 // 将cb的参与者数量加1
23                 cb.await();
24
25                 // cb的参与者数量等于5时,才继续往后执行
26                 System.out.println(Thread.currentThread().getName() + " continued.");
27             } catch (BrokenBarrierException e) {
28                 e.printStackTrace();
29             } catch (InterruptedException e) {
30                 e.printStackTrace();
31             }
32         }
33     }
34 }

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,这些线程才继续运行!

示例2

新建5个线程,当这5个线程达到一定的条件时,执行某项任务。

 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.BrokenBarrierException;
 3
 4 public class CyclicBarrierTest2 {
 5
 6     private static int SIZE = 5;
 7     private static CyclicBarrier cb;
 8     public static void main(String[] args) {
 9
10         cb = new CyclicBarrier(SIZE, new Runnable () {
11             public void run() {
12                 System.out.println("CyclicBarrier‘s parties is: "+ cb.getParties());
13             }
14         });
15
16         // 新建5个任务
17         for(int i=0; i<SIZE; i++)
18             new InnerThread().start();
19     }
20
21     static class InnerThread extends Thread{
22         public void run() {
23             try {
24                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25
26                 // 将cb的参与者数量加1
27                 cb.await();
28
29                 // cb的参与者数量等于5时,才继续往后执行
30                 System.out.println(Thread.currentThread().getName() + " continued.");
31             } catch (BrokenBarrierException e) {
32                 e.printStackTrace();
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             }
36         }
37     }
38 }

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier‘s parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,执行新建cb时注册的Runnable任务。

原文地址:https://www.cnblogs.com/kexianting/p/8552578.html

时间: 2024-10-07 07:35:42

Java锁--CyclicBarrier的相关文章

有关Java 锁原理

锁 锁是用来锁东西的,让别人打不开也看不到!在线程中,用这个“锁”隐喻来说明一个线程在“操作”一个目标(如一个变量)的时候,如果变量是被锁住的,那么其他线程就对这个目标既“操作”不了(挂起)也无法看到目标的内容!对Java并发包,锁的实现基本在java.util.concurrent.locks包中,说“基本”是因为,在java.util.concurrent中还有CountDownLatch(可以看成带计数器的锁),CyclicBarrier,Semaphore(类似于信号量,但是也可以看成一

Java锁--框架

根据锁的添加到Java中的时间,Java中的锁,可以分为"同步锁"和"JUC包中的锁". 同步锁 即通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁.Java 1.0版本中就已经支持同步锁了. 同步锁的原理是,对于每一个对象,有且仅有一个同步锁:不同的线程能共同访问该同步锁.但是,在同一个时间点,该同步锁能且只能被一个线程获取到.这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行:而没有获取到同步锁的线程,必须进行等待,直到获取

java锁的种类以及辨析(转载)

java锁的种类以及辨析(一):自旋锁 锁作为并发共享数据,保证一致性的工具,在JAVA平台有多种实现(如 synchronized 和 ReentrantLock等等 ) .这些已经写好提供的锁为我们开发提供了便利,但是锁的具体性质以及类型却很少被提及.本系列文章将分析JAVA下常见的锁名称以及特性,为大家答疑解惑. 1.自旋锁 自旋锁是采用让当前线程不停地的在循环体内执行实现的,当循环的条件被其他线程改变时 才能进入临界区.如下 01 public class SpinLock { 02  

java 锁!

问题:如何实现死锁. 关键: 1 两个线程ta.tb 2 两个对象a.b 3 ta拥有a的锁,同时在这个锁定的过程中,需要b的锁:tb拥有b的锁,同时在这个锁定的过程中,需要a的锁: 关键的实现难点是3, —— 所以说,死锁也不是那么容易出现的吧.. 实现方式synchronized.Lock 等等 死锁例子1  采用了不同类的两个对象. 原理是: 两个线程尝试进入同一个需要对象锁的方法 package basic.thread; public class DL { public static

java锁和同步

Java 语言设计中的一大创新就是:第一个把跨平台线程模型和锁模型应用到语言中去,Java 语言包括了跨线程的关键字synchronized 和 volatile,使用关键字和java类库就能够简单的实现线程间的同步.在简化与平台无关的并发程序开发时,它没有使并发程序的编写工作变得繁琐,反而使它变得更容易了. 在这一章,我们详细介绍锁的技术和概念,java中提供了两种锁,一个是使用关键字的锁,还有一种类库提供的锁. synchronized关键字锁 synchronized关键字能够作为函数的修

java 锁2

并发,其实是多线程才有的场景... java 多线程? 锁? 现在看来,即使已经工作了4.5年,这仍然不是一个简单的问题. 其实java 本身有提供锁的机制. 比如 Object对象的 wait .notify 方法.synchronized 的原理不过是直接调用对应的对象的 wait方法罢了! 看tomcat 源码的时候,多线程的地方就是直接用到了 wait .notify等方法 —— 这些方法真高级, 一般哪里会用得着???!!! 1 synchronized 其实这个算是容易学的东西了,相

Java 锁的学习

个人学习整理,所有资料均来源于网络,非原创. 死锁的四个必要条件:互斥条件(Mutual exclusion):资源不能被共享,只能由一个进程使用.请求与保持条件(Hold and wait):已经得到资源的进程可以再次申请新的资源.一个进程因请求资源而阻塞时,对已获得的资源保持不放.非剥夺条件(No pre-emption):已经分配的资源不能从相应的进程中被强制地剥夺.循环等待条件(Circular wait):系统中若干进程组成环路,该环路中每个进程都在等待相邻进程正占用的资源. 不难看出

java锁Lock的应用

package jun.lock; public class BankCard { protected String cardId;//银行卡号 protected int balance = 8000;//余额 public String getCardId() { return cardId; } public void setCardId(String cardId) { this.cardId = cardId; } public int getBalance() { return ba

Java锁(一)之内存模型

想要了解Java锁机制.引发的线程安全问题以及数据一致性问题,有必要了解内存模型,机理机制了解清楚了,这些问题也就应声而解了. 一.主内存和工作内存 Java内存模型分为主内存和工作内存,所有的变量都存储在主内存中.每条线程还有自己的工作内存,线程的工作内存中保存了被该线程使用到变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量.不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要主内存来完成. 二.线程.工作内存和主内存 下面是