Fork/Join 型线程池与 Work-Stealing 算法

  

JDK 1.7 时,标准类库添加了 ForkJoinPool,作为对 Fork/Join 型线程池的实现。Fork 在英文中有 分叉 的意思,而 Join合并 的意思。ForkJoinPool 的功能也是如此:Fork 将大任务分叉为多个小任务,然后让小任务执行,Join 是获得小任务的结果,然后进行合并,将合并的结果作为大任务的结果 —— 并且这会是一个递归的过程 —— 因为任务如果足够大,可以将任务多级分叉直到任务足够小。

由此可见,ForkJoinPool 可以满足 并行 地实现 分治算法(Divide-and-Conquer) 的需要。

ForkJoinPool 的类图如下:

可以看到 ForkJoinPool 实现了 ExecutorService 接口,所以首先 ForkJoinPool 也是一个 线程池。因而 RunnableCallable 类型的任务,ForkJoinPool 也可以通过 submitinvokeAllinvokeAny 等方法来执行。但是标准类库还为 ForkJoinPool 定义了一种新的任务,它就是 ForkJoinTask<V>

ForkJoinTask 相关类图:

ForkJoinTask<V> 用来专门定义 Fork/Join 型任务 —— 完成将大任务分割为小任务以及合并结果的工作。一般我们不需要直接继承 ForkJoinTask<V>,而是继承它的子类 RecursiveActionRecursiveTask 并实现对应的抽象方法 —— compute 。其中,RecursiveAction 是不带返回值的 Fork/Join 型任务,所以使用此类任务并不产生结果,也就不涉及到结果的合并;而 RecursiveTask 是带返回值的 Fork/Join 型任务,使用此类任务需要我们进行结果的合并。通过 fork 方法,我们可以产生子任务并执行;通过 join 方法,我们可以获得子任务的结果。



ForkJoinPool 用三种方法用来执行 ForkJoinTask

invoke 方法:

invoke 方法用来执行一个带返回值的任务(通常继承自RecursiveTask),并且该方法是阻塞的,直到任务执行完毕,该方法才会停止阻塞并返回任务的执行结果。

submit 方法:

除了从 ExecutorService 继承的 submit 方法外,ForkJoinPool 还定义了用来执行 ForkJoinTasksubmit 方法 —— 一般该 submit 方法用来执行带返回值的ForkJoinTask(通常继承自RecursiveTask)。该方法是非阻塞的,调用之后将任务提交给 ForkJoinPool 去执行便立即返回,返回的便是已经提交到 ForkJoinPool 去执行的 task —— 由类图可知 ForkJoinTask 实现了 Future 接口,所以可以直接通过 task 来和已经提交的任务进行交互。

execute 方法:

除了从 Executor 获得的 execute 方法外,ForkJoinPool 也定义了用来执行ForkJoinTaskexecute 方法 —— 一般该 execute 方法用来执行不带返回值的ForkJoinTask(通常继承自RecursiveAction) ,该方法同样是非阻塞的。



现在让我们来实践下 ForkJoinPool 的功能:计算 π 的值。
计算 π 的值有一个通过多项式方法,即:
π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……)
多项式的项数越多,计算出的 π 的值越精确。

首先我们定义用来估算 π 的 PiEstimateTask

static class PiEstimateTask extends RecursiveTask<Double> {

    private final long begin;
    private final long end;
    private final long threshold; // 分割任务的临界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {
        if (end - begin <= threshold) {

            int sign = 1; // 符号,取 1 或者 -1
            double result = 0.0;
            for (long i = begin; i < end; i++) {
                result += sign / (i * 2.0 + 1);
                sign = -sign;
            }

            return result * 4;
        }

        // 分割任务
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 异步执行 leftTask
        rightTask.fork(); // 异步执行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 执行完毕返回结果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 执行完毕返回结果

        return leftResult + rightResult; // 合并结果
    }

}

然后我们使用 ForkJoinPoolinvoke 执行 PiEstimateTask

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);

        // 计算 10 亿项,分割任务的临界值为 1 千万
        PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);

        double pi = forkJoinPool.invoke(task); // 阻塞,直到任务执行完毕返回结果

        System.out.println("π 的值:" + pi);

        forkJoinPool.shutdown(); // 向线程池发送关闭的指令
    }
}

运行结果:

我们也可以使用 submit 方法异步的执行任务(此处 submit 方法返回的 future 指向的对象即提交任务时的 task):

public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    Future<Double> future = forkJoinPool.submit(task); // 不阻塞

    double pi = future.get();
    System.out.println("π 的值:" + pi);
    System.out.println("future 指向的对象是 task 吗:" + (future == task));

    forkJoinPool.shutdown(); // 向线程池发送关闭的指令
}

运行结果:



值得注意的是,选取一个合适的分割任务的临界值,对 ForkJoinPool
执行任务的效率有着至关重要的影响。临界值选取过大,任务分割的不够细,则不能充分利用
CPU;临界值选取过小,则任务分割过多,可能产生过多的子任务,导致过多的线程间的切换和加重 GC
的负担从而影响了效率。所以,需要根据实际的应用场景选择一个合适的分割任务的临界值。



ForkJoinPool 相比于 ThreadPoolExecutor,还有一个非常重要的特点(优点)在于,ForkJoinPool具有 Work-Stealing (工作窃取)的能力。所谓 Work-Stealing,在 ForkJoinPool

中的实现为:线程池中每个线程都有一个互不影响的任务队列(双端队列),线程每次都从自己的任务队列的队头中取出一个任务来运行;如果某个线程对应的队列
已空并且处于空闲状态,而其他线程的队列中还有任务需要处理但是该线程处于工作状态,那么空闲的线程可以从其他线程的队列的队尾取一个任务来帮忙运行
—— 感觉就像是空闲的线程去偷人家的任务来运行一样,所以叫 “工作窃取”。

Work-Stealing 的适用场景是不同的任务的耗时相差比较大,即某些任务需要运行较长时间,而某些任务会很快的运行完成,这种情况下用
Work-Stealing 很合适;但是如果任务的耗时很平均,则此时 Work-Stealing
并不适合,因为窃取任务时也是需要抢占锁的,这会造成额外的时间消耗,而且每个线程维护双端队列也会造成更大的内存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作为对 ThreadPoolExecutor 的补充。



总结:
ForkJoinPoolThreadPoolExecutor 都是 ExecutorService(线程池),但ForkJoinPool 的独特点在于:

  1. ThreadPoolExecutor 只能执行 RunnableCallable 任务,而 ForkJoinPool 不仅可以执行 RunnableCallable 任务,还可以执行 Fork/Join 型任务 —— ForkJoinTask —— 从而满足并行地实现分治算法的需要;
  2. ThreadPoolExecutor 中任务的执行顺序是按照其在共享队列中的顺序来执行的,所以后面的任务需要等待前面任务执行完毕后才能执行,而 ForkJoinPool 每个线程有自己的任务队列,并在此基础上实现了 Work-Stealing 的功能,使得在某些情况下 ForkJoinPool 能更大程度的提高并发效率。
时间: 2024-10-05 03:09:14

Fork/Join 型线程池与 Work-Stealing 算法的相关文章

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

Java Fork/Join框架

fork-join框架 fork操作的作用是把一个大的问题划分成若干个较小的问题.在这个划分过程一般是递归进行的.直到可以直接进行计算.需要恰当地选取子问题的大小.太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销.每个子问题计算完成后,可以得到关于整个问题的部分解.join操作的作用是把这些分解手机组织起来,得到完整解. 在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行.那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行.

第五章 Fork/Join框架

Java 7 并发编程实战手册目录 代码下载(https://github.com/Wang-Jun-Chao/java-concurrency) 第五章 Fork/Join框架 5.1简介 通常,使用Java来开发一个简单的并发应用程序时,会创建一些Runnable对象,然后创建对应的Thread对象来控制程序中这些线程的创建.执行以及线程的状态.自从Java 5开始引入了 Executor和ExecutorService接口以及实现这两个接口的类(比如ThreadPoolExecutor)之

FutureTask、Fork/Join、 BlockingQueue

我们之前学习创建线程有Thread和Runnable两种方式,但是两种方式都无法获得执行的结果. 而Callable和Future在任务完成后得到结果. Future是一个接口,表示一个任务的周期,并提供了相应的方法来判断是否已经完成或者取消任务,以及获取任务的结果和取消任务. FutureTask可用于异步获取执行结果或取消执行任务的场景.通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的g

初步了解Fork/Join框架

框架介绍 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个子任务,最终汇总每个子任务的执行结果以得到大任务结果的框架.Fork/Join框架要完成两件事情: 任务分割:Fork/Join框架需要把任务分割成足够小的子任务,如果子任务比较大,就对子任务继续分割: 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行.子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据. 简单

Fork/Join框架介绍(转)

1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果

《java.util.concurrent 包源码阅读》24 Fork/Join框架之Work-Stealing

仔细看了Doug Lea的那篇文章:A Java Fork/Join Framework 中关于Work-Stealing的部分,下面列出该算法的要点(基本是原文的翻译): 1. 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列. 2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO. 3. 子任务会被加入到原先任务所在Worker线程的任务队列. 4. Worker线程用LIFO的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队

聊聊并发(八)——Fork/Join框架介绍

作者 方腾飞 发布于 2013年12月23日 | 被首富的“一个亿”刷屏?不如定个小目标,先把握住QCon上海的优惠吧!2 讨论 分享到:微博微信FacebookTwitter有道云笔记邮件分享 稍后阅读 我的阅读清单 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大

Java并发编程--Fork/Join框架使用

上篇博客我们介绍了通过CyclicBarrier使线程同步,但是上述方法存在一个问题,那就是如果一个大任务跑了2个线程去完成,如果线程2耗时比线程1多2倍,线程1完成后必须等待线程2完成,等待的过程线程1没法复用.现在我们准备解决这个问题,我们希望线程1完成自己的任务后能去帮助线程2完成一部分任务.Java7引如了Fork/Join框架可以很好的解决这个问题. Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果后得到大任务结果的框架.fork