上一节说了CountDown的使用方法,对于用来同步多个线程之间的协作关系,Java更提供了更加高级的方法来实
现,这个类就是CyclicBarrier。 它可以实现当多个分支线程完成他们的工作后,调用await方法来等待,然后等所有的分
支线程工作完毕后,会自动的调用主线程的run方法,这个主线程是一个实现runnable接口的类,在CyclicBarrier实例化
的时候就调用了。
下面我们就用代码来说明这个问题。代码实现的效果是从一个二维数组中查询到我们需要的数字,然后分支成五个
线程来分别从不同的起点开始计算,然后最后调用grouper线程来归并计算的结果得到最终结果。
package com.bird.concursey.charpet4; import java.util.Random; /** * This class will generate a random matrix of numbers between one and 10 where the threads are going to look for a number. * @author bird * 2014年9月22日 上午9:43:49 */ public class MatrixMock { private int data[][]; /** * Fill the matrix with random numbers. Each time you generate a number, compare it with the number you are going to look for. If they are equal, increment the counter. * @param size * @param length * @param number */ public MatrixMock(int size, int length, int number) { int counter = 0; data = new int[size][length]; Random random = new Random(); for(int i = 0; i < size; i++) { for(int j = 0; j < length; j++) { data[i][j] = random.nextInt(10); if(data[i][j] == number) { counter++; } } } System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number); } /** * This method receives an int parameter with the number of a row in the matrix and returns the row if it exists, and returns null if it doesn't exist. * @param row * @return */ public int[] getRow(int row) { if(row > 0 && row > data.length) { return data[row]; } return null; } }
package com.bird.concursey.charpet4; /** * Results. This class will store, in an array, the number of occurrences of the searched number in each row of the matrix. * @author bird * 2014年9月22日 上午9:52:05 */ public class Results { private int[] data; public Results(int size) { data = new int[size]; } /** * This method receives a position in the array and a value as parameters, and establishes the value of that position in the array. * @param position * @param value */ public void setData(int position, int value) { data[position] = value; } public int[] getData() { return data; } }
package com.bird.concursey.charpet4; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class Searcher implements Runnable { private int firstRow; private int lastRow; private MatrixMock mock; private Results results; private int number; private CyclicBarrier cycliBarrier; public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier cycliBarrier) { super(); this.firstRow = firstRow; this.lastRow = lastRow; this.mock = mock; this.results = results; this.number = number; this.cycliBarrier = cycliBarrier; } @Override public void run() { int counter; System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow); for(int i = firstRow; i < lastRow; i++) { int row[] = mock.getRow(i); counter = 0; for(int j = 0; j < row.length; j++) { if(row[j] == number) { counter++; } } results.setData(i, counter); } System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName()); try { cycliBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
package com.bird.concursey.charpet4; import java.util.concurrent.CyclicBarrier; public class Grouper implements Runnable { private Results results; public Grouper(Results results) { super(); this.results = results; } @Override public void run() { int finalResult = 0; System.out.printf("Grouper: Processing results...\n"); int data[] = results.getData(); for (int number : data) { finalResult += number; } System.out.printf("Grouper: Total result: %d.\n", finalResult); } public static void main(String[] args) { final int ROWS=10000; final int NUMBERS=1000; final int SEARCH=5; final int PARTICIPANTS=5; final int LINES_PARTICIPANT=2000; MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH); Results result = new Results(ROWS); Grouper grouper = new Grouper(result); CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper); Searcher searchers[] = new Searcher[PARTICIPANTS]; for (int i=0; i<PARTICIPANTS; i++){ searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, result, 5,barrier); Thread thread=new Thread(searchers[i]); thread.start(); } System.out.printf("Main: The main thread has finished.\n"); } }
时间: 2024-10-08 12:05:07