Java并发包中CyclicBarrier的工作原理、使用示例

1. CyclicBarrier的介绍与源码分析

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction这个Runnable对象,方便处理更复杂的业务场景。

构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}

public int getParties() {
    return parties;
}

实现原理:在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用CyclicBarrier的await方法时,将剩余拦截的线程数减1,然后判断剩余拦截数是否为0,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁,接着先从await方法返回,再从CyclicBarrier的await方法中返回。

await源码

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

dowait源码

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We‘re about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

当最后一个线程到达屏障点,也就是执行dowait方法时,会在return 0 返回之前调用finally块中的breakBarrier方法。

breakBarrier源代码

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

CyclicBarrier主要用于一组线程之间的相互等待,而CountDownLatch一般用于一组线程等待另一组些线程。实际上可以通过CountDownLatch的countDown()和await()来实现CyclicBarrier的功能。即 CountDownLatch中的countDown()+await() = CyclicBarrier中的await()。注意:在一个线程中先调用countDown(),然后调用await()。

其它方法:CycliBarrier对象可以重复使用,重用之前应当调用CyclicBarrier对象的reset方法。

reset源码

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

2. 使用示例

package javalearning;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo {
	private CyclicBarrier cb = new CyclicBarrier(4);
	private Random rnd = new Random();

	class TaskDemo implements Runnable{
		private String id;
		TaskDemo(String id){
			this.id = id;
		}
		@Override
		public void run(){
			try {
				Thread.sleep(rnd.nextInt(1000));
				System.out.println("Thread " + id + " will wait");
				cb.await();
				System.out.println("-------Thread " + id + " is over");
			} catch (InterruptedException e) {
			} catch (BrokenBarrierException e) {
			}
		}
	}

	public static void main(String[] args){
		CyclicBarrierDemo cbd = new CyclicBarrierDemo();
		ExecutorService es = Executors.newCachedThreadPool();
		es.submit(cbd.new TaskDemo("a"));
		es.submit(cbd.new TaskDemo("b"));
		es.submit(cbd.new TaskDemo("c"));
		es.submit(cbd.new TaskDemo("d"));
		es.shutdown();
	}
}

在这个示例中,我们创建了四个线程a、b、c、d,这四个线程提交给了线程池。四个线程不同时间到达cb.await()语句,当四个线程都输出“Thread x will wait”以后才会输出“Thread x is over”。

运行结果

Thread d will wait

Thread a will wait

Thread c will wait

Thread b will wait

-------Thread b is over

-------Thread d is over

-------Thread a is over

-------Thread c is over

3.参考内容

[1] http://ifeve.com/concurrency-cyclicbarrier/

时间: 2024-10-20 11:42:53

Java并发包中CyclicBarrier的工作原理、使用示例的相关文章

Java并发包中Semaphore的工作原理、源码分析及使用示例

1. 信号量Semaphore的介绍 我们以一个停车场运作为例来说明信号量的作用.假设停车场只有三个车位,一开始三个车位都是空的.这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦.以后来的车必须在入口等待,直到停车场中有车辆离开.这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复. 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用.信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的

Java并发包中CountDownLatch的工作原理、使用示例

1. CountDownLatch的介绍 CountDownLatch是一个同步工具,它主要用线程执行之间的协作.CountDownLatch 的作用和 Thread.join() 方法类似,让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒.在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join().在线程池出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了. CountDownLatch主要有两个方法,当一个或多个线程调

Java并发包中Lock的实现原理

Lock 的简介及使用 Lock是java 1.5中引入的线程同步工具,它主要用于多线程下共享资源的控制.本质上Lock仅仅是一个接口(位于源码包中的java\util\concurrent\locks中),它包含以下方法 //尝试获取锁,获取成功则返回,否则阻塞当前线程 void lock(); //尝试获取锁,线程在成功获取锁之前被中断,则放弃获取锁,抛出异常 void lockInterruptibly() throws InterruptedException; //尝试获取锁,获取锁成

JavaScript中this的工作原理以及注意事项

在JavaScript中,this 的概念比较复杂.除了在面向对象编程中,this 还是随处可用的.这篇文章介绍了this 的工作原理,它会造成什么样的问题以及this 的相关例子. 要根据this 所在的位置来理解它,情况大概可以分为3种: 1.在函数中:this 通常是一个隐含的参数. 2.在函数外(顶级作用域中):在浏览器中this 指的是全局对象:在Node.js中指的是模块(module)的导出(exports). 3.传递到eval()中的字符串:如果eval()是被直接调用的,th

java按照集合中元素的属性进行排序示例代码

public class Student { private String name; private int age; private int id; public Student() {  super(); } public Student(String name, int age, int id) {  super();  this.name = name;  this.age = age;  this.id = id; } public String getName() {  retur

Java中GC的工作原理

转文: 一个优秀的Java程序员必须了解GC的工作原理.如何优化GC的性能.如何与GC进行有限的交互,有一些应用程序对性能要求较高,例如嵌入式系统.实时系统等,只有全面提升内存的管理效率,才能提高整个应用程序的性能.本文将从GC的工作原理.GC的几个关键问题进行探讨,最后提出一些Java程序设计建议,如何从GC角度提高Java程序的性能. 一.GC的基本原理: GC是什么? 为什么要有GC呢? GC是垃圾收集的意思(Garbage Collection),内存处理是编程人员容易出现问题的地方,忘

Tomcat中JSP引擎工作原理

http://blog.csdn.net/linjiaxingqqqq/article/details/7164449 JSP运行环境: 执行JSP代码需要在服务器上安装JSP引擎,比较常见的引擎有WebLogic和Tomcat.把这些支持JSP的web服务器配置好后.就可以再客户端通过浏览器来访问JSP页面了.默认端口一般是7001. JSP生命周期: JSP处理请求的方法就是把这些请求都统一看做Servlet.由于这个原因,JSP的很多功能和生命周期,都由Java Servlet技术标准定义

Java web每天学之Servlet工作原理详情解析

上篇文章中我们介绍了Servlet的实现方式以及Servlet的生命周期,我们这篇文章就来介绍一下常用对象. 点击回顾:<Java Web每天学之Servlet的工作原理解析>:<Java Web每天学之Servlet的工作原理解析(二)> 一.HttpServletRequest对象 1.介绍HttpServletRequest对象:主要作用是用来接收客户端发送过来的请求信息,例如:请求的参数,发送的头信息等都属于客户端发来的信息,service()方法中形参接收的是HttpSe

C++中虚函数工作原理和(虚)继承类…

转载请标明出处,原文地址:http://blog.csdn.net/hackbuteer1/article/details/7883531 一.虚函数的工作原理 虚函数的实现要求对象携带额外的信息,这些信息用于在运行时确定该对象应该调用哪一个虚函数.典型情况下,这一信息具有一种被称为 vptr(virtual table pointer,虚函数表指针)的指针的形式.vptr 指向一个被称为 vtbl(virtual table,虚函数表)的函数指针数组,每一个包含虚函数的类都关联到 vtbl.当