当处理器的性能的发展受到各方面因素的限制的时候,计算机产业开始用多处理器结构实现并行计算来提高计算的效率。我们使用多处理器共享存储器的方式实现了多处理器编程,也就是多核编程。当然在这样的系统结构下我们面临着各种各样的挑战,例如如何协调各个处理器之间的数据调度以及现代计算机系统固有的异步特征等等。
在接下来的一系列文章中,我将会介绍一些基础的原理以及并行程序的设计和并发程序的设计及实现,写这篇文章是对近期学习课程的总结,方便自己温故时习,感谢USTC付明老师的《多核并行计算》课程,了解更多推荐《The Art of Multiprocessor Programming, Revised Reprint》。
实例:大数组元素的求和
思想:给出4个线程同时对数组的1/4求和。
- 注意:这是一个低级的算法
- 创建4个线程,每个线程负责部分的工作
- 调用start(),启动每个线程并行的运行
- 使用join()方法等待每个线程运行结束
- 将4个结果相加在一起
class SumThread extends java.lang.Thread {
int lo, int hi, int[] arr; // arguments
int ans = 0; // result
SumThread(int[] a, int l, int h) { … }
public void run(){ … } // override
}
int sum(int[] arr){// can be a static method
int len = arr.length;
int ans = 0;
SumThread[] ts = new SumThread[4];
for(int i=0; i < 4; i++){// do parallel computations
ts[i] = new SumThread(arr,i*len/4,(i+1)*len/4);
ts[i].start(); //必须使用start(),而不是run()方法
}
for(int i=0; i < 4; i++) { // combine results
ts[i].join(); // wait for helper to finish!
ans += ts[i].ans;
}
return ans;
}
join()方法使得调用者阻塞,直到receiver 完成了执行;
线程的启动需要用start(),而不是run()
Fork-join framework
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。
- Fork-join 程序不需要关注线程间的共享内存
EX:求一个数组的最大值。
多线程方法:
public class MyThread implements Runnable {
//下面两个静态成员变量需要通过用关键字synchronized修饰 的方法来访问
private volatile static int max = Integer.MIN_VALUE; //初始最大值
//下面为成员变量,每个线程对象私有
private int[] nums; //待查找数组
private int low; //当前线程对象需要处理的数组开始下标
private int high; //当前线程对象需要处理的数组结束下标
//构造方法,传入待查找数组、开始下标和结束下标
public MyThread(int[] nums, int low, int high){
this.nums = nums;
this.low = low;
this.high = high;
}
@Override
public synchronized void run() {
for(int i=low;i<=high;i++)
{
if(nums[i] > max)
{
MyThread.setMax(nums[i]);
}
}
}
public static synchronized int getMax() {
return max;
}
public static synchronized void setMax(int max) {
MyThread.max = max;
}
}
Fork-join框架:
public class ForkThread extends RecursiveTask<Integer> {
private static final int SEQUENTIAL_THRESHOLD = 5;//子数组到分割最终大小
private final int[] data;
private final int start;
private final int end;
public ForkThread(int[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
public ForkThread(int[] data) {
this(data, 0, data.length);
}
@Override
protected Integer compute() {
final int length = end - start;
if (length < SEQUENTIAL_THRESHOLD) {
return computeDirectly();
}
final int split = length / 2;
final ForkThread left = new ForkThread(data, start, start + split);
left.fork();
final ForkThread right = new ForkThread(data, start + split, end);
return Math.max(right.compute(), left.join());
}
private Integer computeDirectly() {
/*System.out.println(Thread.currentThread() + "computing: " + start
+ " to " + end);*/
int max = Integer.MIN_VALUE;
for (int i = start; i < end; i++) {
if (data[i] > max) {
max = data[i];
}
}
return max;
}
public static void main(String[] args) {
// create a random data set
final int[] data = new int[10000];
final Random random = new Random();
for (int i = 0; i < data.length; i++) {
data[i] = (int)Math.floor((random.nextDouble()*100000.0));
System.out.println(data[i]);
}
/*int data[] = new int[10000];
for(int i=0;i<10000;i++)
data[i] =(int) Math.random()*10000;*/
// submit the task to the pool
final ForkJoinPool pool = new ForkJoinPool(4);
final ForkThread finder = new ForkThread(data);
// System.out.println(pool.invoke(finder));
}
}
Main方法线程:
public class SearchMax {
/**
* @param args
*/
//初始化数组
public static int[] InitialArr(){
final int[] data = new int[10000];
final Random random = new Random();
for (int i = 0; i < data.length; i++) {
data[i] = (int)Math.floor((random.nextDouble()*100000.0));
}
return data;
}
//分治法
public static long DivideMax(int arr[]){
int size = arr.length;
long startTime=System.nanoTime();
Thread t1 = new Thread(new MyThread(arr,0,size/2));
Thread t2 = new Thread(new MyThread(arr,size/2+1,size-1));
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime=System.nanoTime();
System.out.println( "分治法: "+MyThread.getMax());
return (endTime-startTime);
}
//顺序查找
public static long OrderMax(int arr[]){
int size = arr.length;
int ret = 0;
long startTime=System.nanoTime();
for(int i=0;i<size;i++){
if(arr[i]>ret)
ret = arr[i];
}
long endTime=System.nanoTime();
System.out.println( "顺序查找: "+ret);
return (endTime-startTime);
}
//Fork-Join框架
public static long ForkMax(int arr[]){
int ret = 0;
long startTime=System.nanoTime();
// submit the task to the pool
final ForkJoinPool pool = new ForkJoinPool(4);
final ForkThread finder = new ForkThread(arr);
ret = pool.invoke(finder);
long endTime=System.nanoTime();
System.out.println( "Fork_join: "+ret);
return (endTime-startTime);
}
public static void main(String[] args) {
int data[] = InitialArr();
System.out.println("分治法花费时间 "+DivideMax(data)+" ns");
System.out.println("顺序查找花费时间 "+OrderMax(data)+" ns");
System.out.println("Fork_Join花费时间 "+ForkMax(data)+" ns");
}
}
运行结果:
设计一个Fork/Join框架的步骤:
第一步:分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割的子任务足够小;
第二步:执行任务并合并结果,分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里分别获取任务执行,子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join框架使用两个类来完成以上两件事:
(1)ForkJoinTask:我们要使用Fork Join框架,必须首先创建一个Fork Join任务,它提供在任务中执行fork() 和join()操作的机制,通常情况下我们不需要继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
(2)ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
Fork/Join框架的异常处理:
Fork JoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以Fork JoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过Fork JoinTask的getException方法获取异常,使用如下代码:
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException,如果任务没有完成或者没有抛出异常则返回null。
EX:大数组求和
class SumArray extends RecursiveTask<Integer> {
int lo; int hi; int[] arr; // arguments
SumArray(int[] a, int l, int h) { … }
protected Integer compute(){// return answer
if(hi – lo < SEQUENTIAL_CUTOFF) {
int ans = 0;
for(int i=lo; i < hi; i++)
ans += arr[i];
return ans;
} else {
SumArray left = new SumArray(arr,lo,(hi+lo)/2);
SumArray right= new SumArray(arr,(hi+lo)/2,hi);
left.fork();
int rightAns = right.compute();
int leftAns = left.join();
return leftAns + rightAns;
}
}
}
static final ForkJoinPool fjPool = new ForkJoinPool();
int sum(int[] arr){
return fjPool.invoke(new SumArray(arr,0,arr.length));
}