Java fork/join framework小试兼RecursiveAction类的应用

假设要做这么一件事:给你一个double类型数组,让你求这个数组的元素的倒数和,怎么做?当然是先求倒数,然后再求和啦。没错,就是这样,但是如果你有多个cpu core呢?比如四个core, 线性地这么去求是不是有些浪费计算资源,或者说没有充分利用多核的条件以降低执行时间呢?那可不可以把这这个数组拆分成4段,每个核去计算一段的倒数和,然后等都计算完成了,再把结果相加呢?当然可以!就应该这样做,那java中怎么做呢?用fork/join框架就可以了,使用的方法之一是利用RecursiveAction类,我们只要自己新建一个任务类继承这个类,关键是要重写其compute()方法以实现其计算逻辑就可以了。下面是一个一般化地完成这个任务的例子:

package edu.coursera.parallel;

import java.util.concurrent.RecursiveAction;

/**
 * Class wrapping methods for implementing reciprocal array sum in parallel.
 */
public final class ReciprocalArraySum {

    /**
     * Default constructor.
     */
    private ReciprocalArraySum() {
    }

    private static int getNCores() {
        String ncoresStr = System.getenv("COURSERA_GRADER_NCORES");
        if (ncoresStr == null) {
            return Runtime.getRuntime().availableProcessors();
        } else {
            return Integer.parseInt(ncoresStr);
        }
    }

    /**
     * Sequentially compute the sum of the reciprocal values for a given array.
     *
     * @param input Input array
     * @return The sum of the reciprocals of the array input
     */
    protected static double seqArraySum(final double[] input) {
        double sum = 0;

        // Compute sum of reciprocals of array elements
        for (int i = 0; i < input.length; i++) {
            sum += 1 / input[i];
        }

        return sum;
    }

    /**
     * Computes the size of each chunk, given the number of chunks to create
     * across a given number of elements.
     *
     * @param nChunks The number of chunks to create
     * @param nElements The number of elements to chunk across
     * @return The default chunk size
     */
    private static int getChunkSize(final int nChunks, final int nElements) {
        // Integer ceil
        return (nElements + nChunks - 1) / nChunks;
    }

    /**
     * Computes the inclusive element index that the provided chunk starts at,
     * given there are a certain number of chunks.
     *
     * @param chunk The chunk to compute the start of
     * @param nChunks The number of chunks created
     * @param nElements The number of elements to chunk across
     * @return The inclusive index that this chunk starts at in the set of
     *         nElements
     */
    private static int getChunkStartInclusive(final int chunk,
            final int nChunks, final int nElements) {
        final int chunkSize = getChunkSize(nChunks, nElements);
        return chunk * chunkSize;
    }

    /**
     * Computes the exclusive element index that the provided chunk ends at,
     * given there are a certain number of chunks.
     *
     * @param chunk The chunk to compute the end of
     * @param nChunks The number of chunks created
     * @param nElements The number of elements to chunk across
     * @return The exclusive end index for this chunk
     */
    private static int getChunkEndExclusive(final int chunk, final int nChunks,
            final int nElements) {
        final int chunkSize = getChunkSize(nChunks, nElements);
        final int end = (chunk + 1) * chunkSize;
        if (end > nElements) {
            return nElements;
        } else {
            return end;
        }
    }

    /**
     * This class stub can be filled in to implement the body of each task
     * created to perform reciprocal array sum in parallel.
     */
    private static class ReciprocalArraySumTask extends RecursiveAction {
        /**
         * Starting index for traversal done by this task.
         */
        private final int startIndexInclusive;
        /**
         * Ending index for traversal done by this task.
         */
        private final int endIndexExclusive;
        /**
         * Input array to reciprocal sum.
         */
        private final double[] input;
        /**
         * Intermediate value produced by this task.
         */
        private double value;

        /**
         * Constructor.
         * @param setStartIndexInclusive Set the starting index to begin
         *        parallel traversal at.
         * @param setEndIndexExclusive Set ending index for parallel traversal.
         * @param setInput Input values
         */
        ReciprocalArraySumTask(final int setStartIndexInclusive,
                final int setEndIndexExclusive, final double[] setInput) {
            this.startIndexInclusive = setStartIndexInclusive;
            this.endIndexExclusive = setEndIndexExclusive;
            this.input = setInput;
        }

        /**
         * Getter for the value produced by this task.
         * @return Value produced by this task
         */
        public double getValue() {
            return value;
        }

        @Override
        protected void compute() {
            // TODO
            // calculate sequentially
            value = 0;
            for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
                value += 1 / input[i];
            }
        }
    }

    /**
     * TODO: Modify this method to compute the same reciprocal sum as
     * seqArraySum, but use two tasks running in parallel under the Java Fork
     * Join framework. You may assume that the length of the input array is
     * evenly divisible by 2.
     *
     * @param input Input array
     * @return The sum of the reciprocals of the array input
     */
    protected static double parArraySum(final double[] input) {
        assert input.length % 2 == 0;
        int mid = input.length / 2;
        ReciprocalArraySumTask lower = new ReciprocalArraySumTask(0, mid, input);
        ReciprocalArraySumTask high = new ReciprocalArraySumTask(mid, input.length, input);
        lower.fork();
        high.compute();
        lower.join();
        return lower.getValue()+high.getValue();
    }

    /**
     * TODO: Extend the work you did to implement parArraySum to use a set
     * number of tasks to compute the reciprocal array sum. You may find the
     * above utilities getChunkStartInclusive and getChunkEndExclusive helpful
     * in computing the range of element indices that belong to each chunk.
     *
     * @param input Input array
     * @param numTasks The number of tasks to create
     * @return The sum of the reciprocals of the array input
     */
    protected static double parManyTaskArraySum(final double[] input,
            final int numTasks) {
        int taskNum = numTasks;
        if(taskNum>input.length){
            taskNum = input.length;
        }
        ReciprocalArraySumTask[] tasks = new ReciprocalArraySumTask[taskNum];

        int i = 0;
        for(i=0; i<taskNum-1; i++){
            tasks[i] = new ReciprocalArraySumTask(getChunkStartInclusive(i, taskNum, input.length), getChunkEndExclusive(i, taskNum, input.length), input);
            tasks[i].fork();
        }
        tasks[i] = new ReciprocalArraySumTask(getChunkStartInclusive(i, taskNum, input.length), getChunkEndExclusive(i, taskNum, input.length), input);
        tasks[i].compute();

        for(int j=0; j<taskNum-1; j++){
            tasks[j].join();
        }

        double sum = 0;
        for(int j=0; j<taskNum; j++){
            sum += tasks[j].getValue();
        }
        return sum;
    }

    public static void main(String[] args){
        double[] arr = {2,4,8,10};

        double seq = seqArraySum(arr);
        double half = parArraySum(arr);
        double many = parManyTaskArraySum(arr, 4);

        System.out.println(seq);
        System.out.println(half);
        System.out.println(many);
    }
}

  

时间: 2024-10-31 10:43:18

Java fork/join framework小试兼RecursiveAction类的应用的相关文章

Java Concurrency - Fork/Join Framework

Normally, when you implement a simple, concurrent Java application, you implement some Runnable objects and then the corresponding Thread objects. You control the creation, execution, and status of those threads in your program. Java 5 introduced an

浅谈Java Fork/Join并行框架

初步了解Fork/Join框架 Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果.下面的图片展示了这种框架的工作模型: 使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是

Java Fork/Join框架

fork-join框架 fork操作的作用是把一个大的问题划分成若干个较小的问题.在这个划分过程一般是递归进行的.直到可以直接进行计算.需要恰当地选取子问题的大小.太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销.每个子问题计算完成后,可以得到关于整个问题的部分解.join操作的作用是把这些分解手机组织起来,得到完整解. 在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行.那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行.

Java fork join ForkJoinPool 用法例子

本例是把一个大的数组求和的计算的大任务分解到在小范围内求和的小任务,然后把这些小任务之和加起来就是所求之结果. 技术:JDK8.0, Javafork-join模式下的RecursiveTask技术,override compute(). [java] view plain copy print? /** * Author: Bigtree * 本例是把一个大的数组求和的计算的大任务分解到在小范围内求和的小任务,然后把这些小任务之和加起来就是所求之结果. * 技术: * java fork-jo

hello java fork/join for java

1,示例 import java.util.concurrent.RecursiveTask; public class Calculator extends RecursiveTask<Integer> { private static final int THRESHOLD = 4; private int start; private int end; public Calculator(int start, int end) { this.start = start; this.end

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

《java.util.concurrent 包源码阅读》24 Fork/Join框架之Work-Stealing

仔细看了Doug Lea的那篇文章:A Java Fork/Join Framework 中关于Work-Stealing的部分,下面列出该算法的要点(基本是原文的翻译): 1. 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列. 2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO. 3. 子任务会被加入到原先任务所在Worker线程的任务队列. 4. Worker线程用LIFO的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队

JAVA中的Fork/Join框架

看了下Java Tutorials中的fork/join章节,整理下. 什么是fork/join框架 fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能:设计的目的是为了处理那些可以被递归拆分的任务. fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃

使用Java7提供的Fork/Join框架

在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架.这个是对原来的Executors更 进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是.当一个线程正在等待他创建的 子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和 Executors这个方式最大的区别,更加有效的使用了线程的资源和功能.所以非常推荐使用Fork/Join框架. 下面我们以一个例子来说明这个框架如何