JAVA中的Fork/Join框架

看了下Java Tutorials中的fork/join章节,整理下。

什么是fork/join框架

  fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

  fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。

  fork/join框架的核心是ForkJoinPool类,该类继承了AbstractExecutorService类。ForkJoinPool实现了工作窃取算法并且能够执行 ForkJoinTask任务。

基本使用方法

  在使用fork/join框架之前,我们需要先对任务进行分割,任务分割代码应该跟下面的伪代码类似:

if (任务足够小){
  直接执行该任务;}else{
  将任务一分为二;
  执行这两个任务并等待结果;}

  首先,我们会在ForkJoinTask的子类中封装以上代码,不过一般我们会使用更加具体的ForkJoinTask类型,如 RecursiveTask(可以返回一个结果)或RecursiveAction

  当写好ForkJoinTask的子类后,创建该对象,该对象代表了所有需要完成的任务;然后将这个任务对象传给ForkJoinPool实例的invoke()去执行即可。

例子-图像模糊

  为了更加直观的理解fork/join框架是如何工作的,可以看一下下面这个例子。假定我们有一个图像模糊的任务需要完成,原始图像数据可以用一个整型数组表示,每一个整型元素包含了一个像素点的颜色值(RBG,存放在整型元素的不同位中)。目标图像同样是由一个整型数组构成,每个整型元素包含RBG颜色信息;

  执行模糊操作需要遍历原始图像整型数组的每个元素,并对其周围的像素点做均值操作(RGB均值),然后将结果存放到目标数组中。由于图像是一个大数组,这个处理操作会花费一定的时间。但是有了fork/join框架,我们可以充分利用多核处理器进行并行计算。如下是一个可能的代码实现(图像做水平方向的模糊操作):

Tips:该例子仅仅是阐述fork/join框架的使用,并不推荐使用该方法做图像模糊,图像边缘处理也没做判断

public class ForkBlur extends RecursiveAction {
    private static final long serialVersionUID = -8032915917030559798L;
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
            }

            // Re-assemble destination pixel.
            int dpixel = (0xff000000)
                    | (((int) rt) << 16)
                    | (((int) gt) << 8)
                    | (((int) bt) << 0);
            mDestination[index] = dpixel;
        }
    }
...

  现在,我们开始编写compute()的实现方法,该方法分成两部分:直接执行模糊操作和任务的划分;一个数组长度阈值sThreshold可以帮助我们决定任务是直接执行还是进行划分;

    @Override
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }

        int split = mLength / 2;

        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLength - split,
                mDestination));
    }

接下来按如下步骤即可完成图像模糊任务啦:

1、创建图像模糊任务

ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

2、创建ForkJoinPool

ForkJoinPool pool = new ForkJoinPool();

3、执行图像模糊任务

pool.invoke(fb);

完整代码如下:

/*
* Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
*   - Redistributions of source code must retain the above copyright
*     notice, this list of conditions and the following disclaimer.
*
*   - Redistributions in binary form must reproduce the above copyright
*     notice, this list of conditions and the following disclaimer in the
*     documentation and/or other materials provided with the distribution.
*
*   - Neither the name of Oracle or the names of its
*     contributors may be used to endorse or promote products derived
*     from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

import java.awt.image.BufferedImage;
import java.io.File;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import javax.imageio.ImageIO;

/**
 * ForkBlur implements a simple horizontal image blur. It averages pixels in the
 * source array and writes them to a destination array. The sThreshold value
 * determines whether the blurring will be performed directly or split into two
 * tasks.
 *
 * This is not the recommended way to blur images; it is only intended to
 * illustrate the use of the Fork/Join framework.
 */
public class ForkBlur extends RecursiveAction {
    private static final long serialVersionUID = -8032915917030559798L;
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
            }

            // Re-assemble destination pixel.
            int dpixel = (0xff000000)
                    | (((int) rt) << 16)
                    | (((int) gt) << 8)
                    | (((int) bt) << 0);
            mDestination[index] = dpixel;
        }
    }
    protected static int sThreshold = 10000;

    @Override
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }

        int split = mLength / 2;

        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLength - split,
                mDestination));
    }

    // Plumbing follows.
    public static void main(String[] args) throws Exception {
        String srcName = "C:\\test6.jpg";
        File srcFile = new File(srcName);
        BufferedImage image = ImageIO.read(srcFile);

        System.out.println("Source image: " + srcName);

        BufferedImage blurredImage = blur(image);

        String dstName = "C:\\test6_out.jpg";
        File dstFile = new File(dstName);
        ImageIO.write(blurredImage, "jpg", dstFile);

        System.out.println("Output image: " + dstName);

    }

    public static BufferedImage blur(BufferedImage srcImage) {
        int w = srcImage.getWidth();
        int h = srcImage.getHeight();

        int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w);
        int[] dst = new int[src.length];

        System.out.println("Array size is " + src.length);
        System.out.println("Threshold is " + sThreshold);

        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println(Integer.toString(processors) + " processor"
                + (processors != 1 ? "s are " : " is ")
                + "available");

        ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

        ForkJoinPool pool = new ForkJoinPool();

        long startTime = System.currentTimeMillis();
        pool.invoke(fb);
        long endTime = System.currentTimeMillis();

        System.out.println("Image blur took " + (endTime - startTime) +
                " milliseconds.");

        BufferedImage dstImage =
                new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB);
        dstImage.setRGB(0, 0, w, h, dst, 0, w);

        return dstImage;
    }
}

测试了一下,执行效果如下:

Source image: C:\test6.jpg
Array size is 120000
Threshold is 10000
4 processors are available
Image blur took 10 milliseconds.
Output image: C:\test6_out.jpg

JDK中使用fork/join的例子

  除了我们上面提到的使用fork/join框架并行执行图像模糊任务之外,在JAVA SE中,也已经利用fork/join框架实现了一些非常有用的特性。其中一个实现是在JAVA SE8 中java.util.Arrays 类的parallelSort()方法。这些方法和sort()方法类似,但是可以通过fork/join框架并行执行。对于大数组排序,在多核处理器系统中,使用并行排序方法比顺序排序更加高效。当然,关于这些排序方法是如何利用fork/join框架不在本篇文章讨论范围,更多信息可以查看JAVA API文档。
  另一个fork/join框架的实现是在JAVA SE8中的java.util.streams包内,与Lambda表达式相关,更多信息,可以查看https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html链接。

参考链接:https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

时间: 2024-10-07 20:31:56

JAVA中的Fork/Join框架的相关文章

Java 并发编程 -- Fork/Join 框架

概述 Fork/Join 框架是 Java7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.下图是网上流传的 Fork Join 的运行流程图,直接拿过来用了: 工作窃取算法 工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行.那么为什么要使用这个算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创

Java并发编程--Fork/Join框架使用

上篇博客我们介绍了通过CyclicBarrier使线程同步,但是上述方法存在一个问题,那就是如果一个大任务跑了2个线程去完成,如果线程2耗时比线程1多2倍,线程1完成后必须等待线程2完成,等待的过程线程1没法复用.现在我们准备解决这个问题,我们希望线程1完成自己的任务后能去帮助线程2完成一部分任务.Java7引如了Fork/Join框架可以很好的解决这个问题. Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果后得到大任务结果的框架.fork

Java多线程之~~~Fork/Join框架的同步和异步

在Fork/Join框架中,提交任务的时候,有同步和异步两种方式.以前使用的invokeAll()的方法是同步的,也就是任 务提交后,这个方法不会返回直到所有的任务都处理完了.而还有另一种方式,就是使用fork方法,这个是异步的.也 就是你提交任务后,fork方法立即返回,可以继续下面的任务.这个线程也会继续运行. 下面我们以一个查询磁盘的以log结尾的文件的程序例子来说明异步的用法. package com.bird.concursey.charpet8; import java.io.Fil

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

Java并发编程从入门到精通 - 第7章:Fork/Join框架

1.综述:化繁为简,分而治之:递归的分解和合并,直到任务小到可以接受的程度:2.Future任务机制:  Future接口就是对于具体的Runnable或者Callable任务的执行结果进行取消.查询是否完成.获取结果:必要时可以通过get方法获取执行结果,该方法会阻塞直到任务会返回结果:也就是说Future接口提供三种功能:判断任务是否完成.能够中断任务.能够获取任务执行结果:  Future接口里面的常用方法:3.FutureTask:  FutureTask类是Future接口唯一的实现类

Java Fork/Join框架

fork-join框架 fork操作的作用是把一个大的问题划分成若干个较小的问题.在这个划分过程一般是递归进行的.直到可以直接进行计算.需要恰当地选取子问题的大小.太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销.每个子问题计算完成后,可以得到关于整个问题的部分解.join操作的作用是把这些分解手机组织起来,得到完整解. 在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行.那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行.

JAVA 1.7并发之Fork/Join框架

在之前的博文里有说过executor框架,其实Fork/Join就是继承executor的升级版啦 executor用于创建一个线程池,但是需要手动的添加任务,如果需要将大型任务分治,显然比较麻烦 而Fork/Join则是解决这个问题的计算框架 用户定义部分: 如何分治  (compute(1 , 10)) 定义一个函数用于分治,如果处理区间太大则分开,递归的调用自己 (compute(1 , 5) compute(6 , 10)) 如何计算  (compute(1 , 5))  如果区间合适则

《java.util.concurrent 包源码阅读》26 Fork/Join框架之Join

接下来看看调用ForkJoinTask的join方法都发生了什么: public final V join() { // doJoin方法返回该任务的状态,状态值有三种: // NORMAL, CANCELLED和EXCEPTIONAL // join的等待过程在doJoin方法中进行 if (doJoin() != NORMAL) // reportResult方法针对任务的三种状态有三种处理方式: // NORMAL: 直接返回getRawResult()方法的返回值 // CANCELLE

线程基础:多任务处理(13)——Fork/Join框架(解决排序问题)

============== 接上文< 线程基础:多任务处理(12)--Fork/Join框架(基本使用)> 3. 使用Fork/Join解决实际问题 之前文章讲解Fork/Join框架的基本使用时,所举的的例子是使用Fork/Join框架完成1-1000的整数累加.这个示例如果只是演示Fork/Join框架的使用,那还行,但这种例子和实际工作中所面对的问题还有一定差距.本篇文章我们使用Fork/Join框架解决一个实际问题,就是高效排序的问题. 3-1. 使用归并算法解决排序问题 排序问题是