Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork就是把一个大任务切分为若干子任务并行的执行。类似MapReduce里面的Map。
Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。类似MapReduce里面的Reduce。
举例说明,统计1~100的和,并行运行, 每个线程计算20个数的,如果当前线程统计的数量多于20,就切分为两个线程运行,切分点为中间数,至少分配的每一个线程的统计数小于或等于20,这个分裂任务的过程就叫做Fork。最后各个线程向上汇报汇总结果,这个汇聚结果的过程就叫做Join。
流程图如下:
代码如下:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicInteger; class Calculator extends RecursiveTask<Integer> { /** * */ private static final long serialVersionUID = 1L; /** * 线程计数器 */ public static AtomicInteger tcounter = new AtomicInteger(0); /** * 计算阀值 */ private static final int THRESHOLD = 20; /** * 开始值 */ private int start; /** * 结束值 */ private int end; public Calculator(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { tcounter.incrementAndGet(); System.out.println("start:" + start + ", end:" + end); int sum = 0; if((end - start) <= THRESHOLD) { /** * 小于等于阀值,直接计算 */ for(int i = start; i<= end; i++) { sum += i; } } else { /** * 大于阀值,任务分解, 并汇聚结果 */ int middle = (start + end) / 2; Calculator left = new Calculator(start, middle); Calculator right = new Calculator(middle + 1, end); left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; } } public class TestForkJoinPool { public static void main(String[] args) throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); Future<Integer> result = forkJoinPool.submit(new Calculator(1, 100)); System.out.println("结果: " + result.get() + ", " + Calculator.tcounter + "个线程参与了运算"); } }
执行结果
start:1, end:100 start:1, end:50 start:51, end:100 start:1, end:25 start:1, end:13 start:26, end:50 start:14, end:25 start:51, end:75 start:26, end:38 start:39, end:50 start:51, end:63 start:64, end:75 start:76, end:100 start:76, end:88 start:89, end:100 结果: 5050, 15个线程参与了运算
聊聊核心类,
ForkJoinPool:负责建立一个ForkJoin运行环境
ForkJoinTask: 表示实际运行的任务,一般使用它的两个子类,一个是RecursiveAction(任务不带返回值时使用),另一个是RecursiveTask(任务带返回值时使用)。按照情况选择任意一个, 使用时需要继承该子类,然后实现抽象方法compute【任务的逻辑就在compute方法里编写】。
ForkJoinTask.fork方法表示分裂任务, ForkJoinTask.join表示汇聚分裂任务的compute方法的执行结果。
时间: 2024-11-12 14:54:09