Java的Fork/Join任务

当我们需要执行大量的小任务时,有经验的Java开发人员都会采用线程池来高效执行这些小任务。然而,有一种任务,例如,对超过1000万个元素的数组进行排序,这种任务本身可以并发执行,但如何拆解成小任务需要在任务执行的过程中动态拆分。这样,大任务可以拆成小任务,小任务还可以继续拆成更小的任务,最后把任务的结果汇总合并,得到最终结果,这种模型就是Fork/Join模型。

Java7引入了Fork/Join框架,我们通过RecursiveTask这个类就可以方便地实现Fork/Join模式。

例如,对一个大数组进行并行求和的RecursiveTask,就可以这样编写:

 1 public class SumTask extends RecursiveTask<Long> {
 2     static final int THRESHOLD = 100;
 3     long[] array;
 4     int start;
 5     int end;
 6
 7     SumTask(long[] array, int start, int end) {
 8         this.array = array;
 9         this.start = start;
10         this.end = end;
11     }
12
13     @Override
14     protected Long compute() {
15         if (end - start <= THRESHOLD) {
16             // 如果任务足够小,直接计算:
17             long sum = 0;
18             for (int i = start; i < end; i++) {
19                 sum += array[i];
20             }
21             try {
22                 Thread.sleep(1000);
23             } catch (InterruptedException e) {
24             }
25             System.out.println(String.format("compute %d~%d = %d", start, end, sum));
26             return sum;
27         }
28         // 任务太大,一分为二:
29         int middle = (end + start) / 2;
30         System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
31         SumTask subtask1 = new SumTask(this.array, start, middle);
32         SumTask subtask2 = new SumTask(this.array, middle, end);
33         invokeAll(subtask1, subtask2);
34         Long subresult1 = subtask1.join();
35         Long subresult2 = subtask2.join();
36         Long result = subresult1 + subresult2;
37         System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
38         return result;
39     }
40
41     public static void main(String[] args) {
42         // 创建随机数组成的数组:
43         long[] array = new long[400];
44         fillRandom(array);
45         // fork/join task:
46         ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发数4
47         ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
48         long startTime = System.currentTimeMillis();
49         Long result = fjp.invoke(task);
50         long endTime = System.currentTimeMillis();
51         System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
52     }
53
54     private static void fillRandom(long[] array) {
55         for(int i=0; i<array.length; i++){
56             Random random = new Random();
57             int i1 = random.nextInt(10);
58             array[i] = i1;
59         }
60     }
61 }

编写这个Fork/Join任务的关键在于,在执行任务的compute()方法内部,先判断任务是不是足够小,如果足够小,就直接计算并返回结果(注意模拟了1秒延时),否则,把自身任务一拆为二,分别计算两个子任务,再返回两个子任务的结果之和。

main方法中的关键代码是fjp.invoke(task)来提交一个Fork/Join任务并发执行,然后获得异步执行的结果。

我们设置任务的最小阀值是100,当提交一个400大小的任务时,在4核CPU上执行,会一分为二,再二分为四,每个最小子任务的执行时间是1秒,由于是并发4个子任务执行,整个任务最终执行时间大约为1秒。

新手在编写Fork/Join任务时,往往用搜索引擎搜到一个例子,然后就照着例子写出了下面的代码:

 1 protected Long compute() {
 2     if (任务足够小?) {
 3         return computeDirect();
 4     }
 5     // 任务太大,一分为二:
 6     SumTask subtask1 = new SumTask(...);
 7     SumTask subtask2 = new SumTask(...);
 8     // 分别对子任务调用fork():
 9     subtask1.fork();
10     subtask2.fork();
11     // 合并结果:
12     Long subresult1 = subtask1.join();
13     Long subresult2 = subtask2.join();
14     return subresult1 + subresult2;
15 }

很遗憾,这种写法是错!误!的!这样写没有正确理解Fork/Join模型的任务执行逻辑。

JDK用来执行Fork/Join任务的工作线程池大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。对400个元素的数组求和,执行时间应该为1秒。但是,换成上面的代码,执行时间却是两秒。

这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程,导致了4个子任务至少需要7个线程才能并发执行。

打个比方,假设一个酒店有400个房间,一共有4名清洁工,每个工人每天可以打扫100个房间,这样,4个工人满负荷工作时,400个房间全部打扫完正好需要1天。

Fork/Join的工作模式就像这样:首先,工人甲被分配了400个房间的任务,他一看任务太多了自己一个人不行,所以先把400个房间拆成两个200,然后叫来乙,把其中一个200分给乙。

紧接着,甲和乙再发现200也是个大任务,于是甲继续把200分成两个100,并把其中一个100分给丙,类似的,乙会把其中一个100分给丁,这样,最终4个人每人分到100个房间,并发执行正好是1天。

如果换一种写法:

// 分别对子任务调用fork():
subtask1.fork();
subtask2.fork();

这个任务就分!错!了!

比如甲把400分成两个200后,这种写法相当于甲把一个200分给乙,把另一个200分给丙,然后,甲成了监工,不干活,等乙和丙干完了他直接汇报工作。乙和丙在把200分拆成两个100的过程中,他俩又成了监工,这样,本来只需要4个工人的活,现在需要7个工人才能1天内完成,其中有3个是不干活的。

其实,我们查看JDK的invokeAll()方法的源码就可以发现,invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。

时间: 2024-08-29 11:18:22

Java的Fork/Join任务的相关文章

Java Concurrency - Fork/Join Framework

Normally, when you implement a simple, concurrent Java application, you implement some Runnable objects and then the corresponding Thread objects. You control the creation, execution, and status of those threads in your program. Java 5 introduced an

Java 多线程 Fork/Join

Fork/Join Fork/Join将大任务切分成小任务来分治运算,fork分join合. 一般直接使用ForkJoinTask的子类RecursiveTask. RecursiveTask的用法 1.新建类A来继承RecursiveTask,实现compute()方法,这个方法就是需要分治的代码.其中,调用fork()方法来表示需要分解计算的内容,调用join()方法来获取结果 2.新建ForkJoinPool,使用ForkJoinPool.submit(A的实例),来提交分治代码,并使用F

Java使用Fork/Join框架来并行执行任务

现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机.多核处理器已被广泛应用.在未来,处理器的核心数将会发展的越来越多. 虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势. 为了充分利用多CPU.多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态.为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执

我的Java开发学习之旅------&gt;Java使用Fork/Join框架来并行执行任务

现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机.多核处理器已被广泛应用.在未来,处理器的核心数将会发展的越来越多. 虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势. 为了充分利用多CPU.多核CPU的性能优势,级软基软件系统应该可以充分"挖掘"每个CPU的计算能力,决不能让某个CPU处于"空闲"状态.为此,可以考虑把一个任务拆分成多个"小任务",把

并行计算有向无环图和fork/join 框架

从多任务OS开始,线程主要用来表示IO异步:而今随着4G和多核等的到来,计算密集型又热门起来了. 硬件价格和性能从低到高: PC/Laptop multi core, memory shared PC clusters SuperComputers 假设一个理想并行计算机:每个处理器计算能力相同,忽略调度, static thread 是对一个虚拟处理器的软件层面的抽象; static强调的是线程一直存在,不包含线程的创建和销毁. dynamic multithreded concurrency

JDK1.7之Fork/join

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.Fork/Join框架要完成两件事情: 1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割 2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行.子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据.

初步了解Fork/Join框架

框架介绍 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个子任务,最终汇总每个子任务的执行结果以得到大任务结果的框架.Fork/Join框架要完成两件事情: 任务分割:Fork/Join框架需要把任务分割成足够小的子任务,如果子任务比较大,就对子任务继续分割: 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行.子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据. 简单

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

《java.util.concurrent 包源码阅读》24 Fork/Join框架之Work-Stealing

仔细看了Doug Lea的那篇文章:A Java Fork/Join Framework 中关于Work-Stealing的部分,下面列出该算法的要点(基本是原文的翻译): 1. 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列. 2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO. 3. 子任务会被加入到原先任务所在Worker线程的任务队列. 4. Worker线程用LIFO的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队