Java多线程之~~~CyclicBarrier 类的使用

上一节说了CountDown的使用方法,对于用来同步多个线程之间的协作关系,Java更提供了更加高级的方法来实

现,这个类就是CyclicBarrier。 它可以实现当多个分支线程完成他们的工作后,调用await方法来等待,然后等所有的分

支线程工作完毕后,会自动的调用主线程的run方法,这个主线程是一个实现runnable接口的类,在CyclicBarrier实例化

的时候就调用了。

下面我们就用代码来说明这个问题。代码实现的效果是从一个二维数组中查询到我们需要的数字,然后分支成五个

线程来分别从不同的起点开始计算,然后最后调用grouper线程来归并计算的结果得到最终结果。

package com.bird.concursey.charpet4;

import java.util.Random;

/**
 * This class will generate a random matrix of numbers
between one and 10 where the threads are going to look for a number.
 * @author bird
 * 2014年9月22日 上午9:43:49
 */
public class MatrixMock {
	private int data[][];

	/**
	 * Fill the matrix with random numbers. Each time you generate a number, compare it
with the number you are going to look for. If they are equal, increment the counter.
	 * @param size
	 * @param length
	 * @param number
	 */
	public MatrixMock(int size, int length, int number) {
		int counter = 0;
		data = new int[size][length];
		Random random = new Random();
		for(int i = 0; i < size; i++) {
			for(int j = 0; j < length; j++) {
				data[i][j] = random.nextInt(10);
				if(data[i][j] == number) {
					counter++;
				}
			}
		}
		System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number);
	}

	/**
	 * This method receives an int parameter with the
number of a row in the matrix and returns the row if it exists, and returns null if it
doesn't exist.
	 * @param row
	 * @return
	 */
	public int[] getRow(int row) {
		if(row > 0 && row > data.length) {
			return data[row];
		}
		return null;
	}

}
package com.bird.concursey.charpet4;
/**
 * Results. This class will store, in an array, the
number of occurrences of the searched number in each row of the matrix.
 * @author bird
 * 2014年9月22日 上午9:52:05
 */
public class Results {

	private int[] data;

	public Results(int size) {
		data = new int[size];
	}
	/**
	 * This method receives a position in the array and
a value as parameters, and establishes the value of that position in the array.
	 * @param position
	 * @param value
	 */
	public void setData(int position, int value) {
		data[position] = value;
	}

	public int[] getData() {
		return data;
	}
}
package com.bird.concursey.charpet4;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Searcher implements Runnable {

	private int firstRow;

	private int lastRow;

	private MatrixMock mock;

	private Results results;

	private int number;

	private CyclicBarrier cycliBarrier;

	public Searcher(int firstRow, int lastRow, MatrixMock mock,
			Results results, int number, CyclicBarrier cycliBarrier) {
		super();
		this.firstRow = firstRow;
		this.lastRow = lastRow;
		this.mock = mock;
		this.results = results;
		this.number = number;
		this.cycliBarrier = cycliBarrier;
	}

	@Override
	public void run() {
		int counter;
		System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);
		for(int i = firstRow; i < lastRow; i++) {
			int row[] = mock.getRow(i);
			counter = 0;
			for(int j = 0; j < row.length; j++) {
				if(row[j] == number) {
					counter++;
				}
			}
			results.setData(i, counter);
		}
		System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName());
		try {
			cycliBarrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}

}
package com.bird.concursey.charpet4;

import java.util.concurrent.CyclicBarrier;

public class Grouper implements Runnable {

	private Results results;

	public Grouper(Results results) {
		super();
		this.results = results;
	}

	@Override
	public void run() {
		int finalResult = 0;
		System.out.printf("Grouper: Processing results...\n");
		int data[] = results.getData();
		for (int number : data) {
			finalResult += number;
		}
		System.out.printf("Grouper: Total result: %d.\n", finalResult);
	}

	public static void main(String[] args) {
		final int ROWS=10000;
		final int NUMBERS=1000;
		final int SEARCH=5;
		final int PARTICIPANTS=5;
		final int LINES_PARTICIPANT=2000;

		MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
		Results result = new Results(ROWS);
		Grouper grouper = new Grouper(result);

		CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
		Searcher searchers[] = new Searcher[PARTICIPANTS];
		for (int i=0; i<PARTICIPANTS; i++){
			searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, result, 5,barrier);
			Thread thread=new Thread(searchers[i]);
			thread.start();
		}
		System.out.printf("Main: The main thread has finished.\n");
	}
}
时间: 2024-10-08 12:05:07

Java多线程之~~~CyclicBarrier 类的使用的相关文章

Java多线程同步工具类之CyclicBarrier

一.CyclicBarrier使用 CyclicBarrier从字面上可以直接理解为线程运行的屏障,它可以让一组线程执行到一个共同的屏障点时被阻塞,直到最后一个线程执行到指定位置,你设置的执行线程就会触发运行:同时CyclicBarrier相比与CountDownLatch,它是可以被重置的:下面我们通过一个简单例子看下CyclicBarrier的使用: 实例化一个CyclicBarrier对象并传入你要控制的线程内部: public static void main(String[] args

Java多线程01(Thread类、线程创建、线程池)

Java多线程(Thread类.线程创建.线程池) 第一章 多线程 1.1 多线程介绍 1.1.1 基本概念 进程:进程指正在运行的程序.确切的来说,当一个程序进入内存运行,即变成一个进程,进程是处于运行过程中的程序,并且具有一定独立功能. 线程:线程是进程中的一个执行单元,负责当前进程中程序的执行,一个进程中至少有一个线程.一个进程中是可以有多个线程的,这个应用程序也可以称之为多线程程序. 简而言之:一个程序运行后至少有一个进程,一个进程中可以包含多个线程 1.1.2 单线程程序 - 从入口m

【Java多线程】CyclicBarrier同步辅助类

CyclicBarrier是java.util.concurrent包下的一个同步辅助类,类似于CountDownLatch,也是一个同步计数器. 与CountDownLatch不同的区别是:    CountDownLatch的await()方法阻塞的原因是等待调用一定次数的countDown()方法, 可以在同一线程完成; CyclicBarrier的await()方法阻塞的原因是等待一定数量的线程调用await()方法, 必须在不同线程调用 所以,概括来说: CountDownLatch是

java.util.concurrent CyclicBarrier类

CyclicBarrier类: 原文:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier.CyclicBarrier支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次.若在继续

Java 多线程之 Thread 类 和 Runnable 接口初步使用

Thread 类 Thread 类是在 JDK1.0 时就存在的, 在 Java 中接触的多线程开发基本上都会从这个类开始. Thread之定义线程类 使用 Thread 创建线程的方法很简单, Thread 是一个类, 在需要创建线程时, 我们只需要继承这个类, 并将 run() 方法进行重写即可. class PrimeThread extends Thread { long minPrime; PrimeThread(long minPrime) { this.minPrime = min

Java多线程之~~~Phaser类实现任务的同步

在多线程开发中,经常会碰到将多个任务分配给多个线程,每个线程执行他的任务,但是,每个任务又分为好几个 阶段,每个阶段期望各个线程同时达到,意思是,每一步每个线程都要同步,当有一个线程走完第一步的时候,他得等 待其他的线程都完成第一步了才能继续下一步,步调一致能解决很多问题.下面我们使用一个例子,这个例子是模拟遍 历机器上的一些文件,找出以log结尾的文件,并且他的最后修改时间为24小时以内,我们开启3个线程去完成这个任 务.并且使用Phaser来同步各个任务. package com.bird.

Java 多线程 (Thread 类)

1.多线程 1.多线程实现 两种方式可以实现多线程: 继承 Thread 类,重写 run 方法:定义对象,调用 start 方法 创建类实现 Runnable 接口,作为实参传递给 thread 的构造方法.定义对象,调用 start 方法. 1.1.继承 Thread 继承类,重写方法 class TDemo1 extends Thread { public String name; // 取个名字,便于识别 public TDemo1 (String name) { // 构造方法 thi

Java多线程之原子操作类

在并发编程中很容易出现并发安全问题,最简单的例子就是多线程更新变量i=1,多个线程执行i++操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过Synchronized进行控制来达到线程安全的目的.但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案.实际上,在J.U.C下的Atomic包提供了一系列的操作简单,性能高效,并能保证线程安全的类去更新多种类型.Atomic包下的这些类都是采用乐观锁策略CAS来更新数据. CAS原理与问题 CAS操作(又称为无

多线程并发常用类:condition,semaphore,CyclicBarrier,countdownlatch,exchanger使用整理

condition 类: 作为一个示例,假定有一个绑定的缓冲区,它支持 put 和 take 方法.如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞:如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞.我们喜欢在单独的等待 set 中保存 put 线程和 take 线程,这样就可以在缓冲区中的项或空间变得可用时利用最佳规划,一次只通知一个线程.可以使用两个 Condition 实例来做到这一点. class BoundedBuff