多线程-Fork/Join

Fork/Join

Java7提供了Fork/Join来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。

类图

Java7提供了ForkJoinPool来支持将一个任务拆分为多个小任务并行计算,再把多个小任务的结果合并成总的计算结果。ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

ForkJoinPool(int n)创建一个包含n个并行线程的ForkJoinPool

ForkJoinPool()创建一个Runtime.availableProcessors()返回值个数的并行线程。

ForkJoinTask代表一个可以并行、合并的任务,是一个抽象类,它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,而RecursiveAction代表没有返回值的任务。

RecursiveAction实例:

package org.github.lujiango;  

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;  

class PrintTask extends RecursiveAction {
    private static final long serialVersionUID = 1L;
    private static final int threshold = 50;
    private int start;
    private int end;  

    public PrintTask(int start, int end) {
        this.start = start;
        this.end = end;
    }  

    @Override
    protected void compute() {
        if (end - start < threshold) {
            for (int i = start; i < end; i++) {
                System.out.println(Thread.currentThread().getName() + " i: " + i);
            }
        } else {
            int middle = (start + end) / 2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            left.fork();
            right.fork();
        }
    }  

}  

public class Test20 {  

    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new PrintTask(0, 300));
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();
    }  

}

分解后的任务分别调用fork()方法开始并行执行。

RecursiveTask<T>实例:

如果大任务是有返回值的任务,则可以让任务继承RecursiveTask<T>,其中泛型参数T就代表了该任务的返回值类型。

package org.github.lujiango;  

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;  

class CalTask extends RecursiveTask<Integer> {
    private static final long serialVersionUID = 1L;
    private static final int threshold = 20;
    private int[] arr;
    private int start;
    private int end;  

    public CalTask(int[] arr, int start, int end) {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }  

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < threshold) {
            for (int i = start; i < end; i++) {
                sum += arr[i];
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            CalTask left = new CalTask(arr, start, middle);
            CalTask right = new CalTask(arr, middle, end);
            left.fork();
            right.fork();
            return left.join() + right.join();
        }
    }  

}  

public class Test21 {  

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int[] arr = new int[100];
        Random random = new Random();
        int total = 0;
        for (int i = 0, len = arr.length; i < len; i++) {
            int tmp = random.nextInt(20);
            total += (arr[i] = tmp);
        }
        System.out.println(total);  

        ForkJoinPool pool = new ForkJoinPool();
        Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
        System.out.println(future.get());
        pool.shutdown();  

    }  

}

  

时间: 2024-11-06 14:41:49

多线程-Fork/Join的相关文章

Java 多线程 Fork/Join

Fork/Join Fork/Join将大任务切分成小任务来分治运算,fork分join合. 一般直接使用ForkJoinTask的子类RecursiveTask. RecursiveTask的用法 1.新建类A来继承RecursiveTask,实现compute()方法,这个方法就是需要分治的代码.其中,调用fork()方法来表示需要分解计算的内容,调用join()方法来获取结果 2.新建ForkJoinPool,使用ForkJoinPool.submit(A的实例),来提交分治代码,并使用F

Java对多线程~~~Fork/Join同步和异步帧

于Fork/Join骨架,当提交的任务,有两个同步和异步模式.它已被用于invokeAll()该方法是同步的.是任何 务提交后,这种方法不会返回直到全部的任务都处理完了.而还有还有一种方式,就是使用fork方法,这个是异步的.也 就是你提交任务后,fork方法马上返回.能够继续以下的任务. 这个线程也会继续执行. 以下我们以一个查询磁盘的以log结尾的文件的程序样例来说明异步的使用方法. package com.bird.concursey.charpet8; import java.io.Fi

单线程排序和利用Fork/Join进行多线程并行排序的简单对比

Fork/Join框架自从在JDK7中引进之后,对并行计算的设计带来了更多便利. 本文使用java原生的排序方法Array.sort单线程排序,和利用Fork/Join框架进行任务分割设计的快速排序进行对比. 首先,使用以下方法构造一个简单的文件样本,目标是生成一个文本文件,10000000行,每行为一个20000以内的随机数: package sort; import java.io.File; import java.io.FileWriter; import java.io.IOExcep

Java多线程之~~~Fork/Join框架的同步和异步

在Fork/Join框架中,提交任务的时候,有同步和异步两种方式.以前使用的invokeAll()的方法是同步的,也就是任 务提交后,这个方法不会返回直到所有的任务都处理完了.而还有另一种方式,就是使用fork方法,这个是异步的.也 就是你提交任务后,fork方法立即返回,可以继续下面的任务.这个线程也会继续运行. 下面我们以一个查询磁盘的以log结尾的文件的程序例子来说明异步的用法. package com.bird.concursey.charpet8; import java.io.Fil

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

使用Java7提供的Fork/Join框架

在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架.这个是对原来的Executors更 进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是.当一个线程正在等待他创建的 子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和 Executors这个方式最大的区别,更加有效的使用了线程的资源和功能.所以非常推荐使用Fork/Join框架. 下面我们以一个例子来说明这个框架如何

Fork/Join编程模型

1.一种并行计算的多线程编程模型 2.开始--任务分割--多线程异步执行---任务合并--阻塞等待合并结果.(分治算法) 3.work-stealing算法: 每个线程维护一个各自的双端的链表,有新任务时之间插入的前端优先执行,前端无任务时,窃取其他线程双端链表的任务加入到自己的尾端进行处理. 通常的情况下,并发的线程池都是维护一个共享的任务队列,新任务到来时插入到队列的尾部,而线程执行任务时取队列的首部任务,而Fork/Join编 模型刚好相反,优先处理新任务,新任务放在最前面优先执行.自己的

JAVA 1.7并发之Fork/Join框架

在之前的博文里有说过executor框架,其实Fork/Join就是继承executor的升级版啦 executor用于创建一个线程池,但是需要手动的添加任务,如果需要将大型任务分治,显然比较麻烦 而Fork/Join则是解决这个问题的计算框架 用户定义部分: 如何分治  (compute(1 , 10)) 定义一个函数用于分治,如果处理区间太大则分开,递归的调用自己 (compute(1 , 5) compute(6 , 10)) 如何计算  (compute(1 , 5))  如果区间合适则

并行计算有向无环图和fork/join 框架

从多任务OS开始,线程主要用来表示IO异步:而今随着4G和多核等的到来,计算密集型又热门起来了. 硬件价格和性能从低到高: PC/Laptop multi core, memory shared PC clusters SuperComputers 假设一个理想并行计算机:每个处理器计算能力相同,忽略调度, static thread 是对一个虚拟处理器的软件层面的抽象; static强调的是线程一直存在,不包含线程的创建和销毁. dynamic multithreded concurrency