浅谈Java Fork/Join并行框架

初步了解Fork/Join框架

Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果。下面的图片展示了这种框架的工作模型:

使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是寻找一个大数组中的最大/最小值,我们可以将一个大数组拆成很多小数组,然后分别求解每个小数组中的最大/最小值,然后根据这些任务的结果组装出最后的最大最小值,下面的代码展示了如何通过Fork/Join求解数组的最大值:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
/**
* Created by hujian06 on 2017/9/28.
*
* fork/join demo
*/
public class ForkJoinDemo {
 
 /**
  * how to find the max number in array by Fork/Join
  */
 private static class MaxNumber extends RecursiveTask<Integer> {
 
   private int threshold = 2;
 
   private int[] array; // the data array
 
   private int index0 = 0;
   private int index1 = 0;
 
   public MaxNumber(int[] array, int index0, int index1) {
     this.array = array;
     this.index0 = index0;
     this.index1 = index1;
   }
 
   @Override
   protected Integer compute() {
     int max = Integer.MIN_VALUE;
     if ((index1 - index0) <= threshold) {
 
       for (int i = index0;i <= index1; i ++) {
         max = Math.max(max, array[i]);
       }
 
     } else {
       //fork/join
       int mid = index0 + (index1 - index0) / 2;
       MaxNumber lMax = new MaxNumber(array, index0, mid);
       MaxNumber rMax = new MaxNumber(array, mid + 1, index1);
 
       lMax.fork();
       rMax.fork();
 
       int lm = lMax.join();
       int rm = rMax.join();
 
       max = Math.max(lm, rm);
 
     }
 
     return max;
   }
 }
 
 public static void main(String ... args) throws ExecutionException, InterruptedException, TimeoutException {
 
   ForkJoinPool pool = new ForkJoinPool();
 
   int[] array = {100,400,200,90,80,300,600,10,20,-10,30,2000,1000};
 
   MaxNumber task = new MaxNumber(array, 0, array.length - 1);
 
   Future<Integer> future = pool.submit(task);
 
   System.out.println("Result:" + future.get(1, TimeUnit.SECONDS));
 
 }
 
}

可以通过设置不同的阈值来拆分成小任务,阈值越小代表拆出来的小任务越多。

工作窃取算法

Fork/Join在实现上,大任务拆分出来的小任务会被分发到不同的队列里面,每一个队列都会用一个线程来消费,这是为了获取任务时的多线程竞争,但是某些线程会提前消费完自己的队列。而有些线程没有及时消费完队列,这个时候,完成了任务的线程就会去窃取那些没有消费完成的线程的任务队列,为了减少线程竞争,Fork/Join使用双端队列来存取小任务,分配给这个队列的线程会一直从头取得一个任务然后执行,而窃取线程总是从队列的尾端拉取task。

Frok/Join框架的实现细节

在上面的示例代码中,我们发现Fork/Join的任务是通过ForkJoinPool来执行的,所以框架的一个核心是任务的fork和join,然后就是这个ForkJoinPool。关于任务的fork和join,我们可以想象,而且也是由我们的代码自己控制的,所以要分析Fork/Join,那么ForkJoinPool最值得研究。

上面的图片展示了ForkJoinPool的类关系图,可以看到本质上它就是一个Executor。在ForkJoinPool里面,有两个特别重要的成员如下:

volatile WorkQueue[] workQueues;
final ForkJoinWorkerThreadFactory factory;

workQueues 用于保存向ForkJoinPool提交的任务,而具体的执行有ForkJoinWorkerThread执行,而ForkJoinWorkerThreadFactory可以用于生产出ForkJoinWorkerThread。可以看一些ForkJoinWorkerThread,可以发现每一个ForkJoinWorkerThread会有一个pool和一个workQueue,和我们上面描述的是一致的,每个线程都被分配了一个任务队列,而执行这个任务队列的线程由pool提供。

下面我们看一下当我们fork的时候发生了什么:

public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}

看上面的fork代码,可以看到首先取到了当前线程,然后判断是否是我们的ForkJoinPool专用线程,如果是,则强制类型转换(向下转换)成ForkJoinWorkerThread,然后将任务push到这个线程负责的队列里面去。如果当前线程不是ForkJoinWorkerThread类型的线程,那么就会走else之后的逻辑,大概的意思是首先尝试将任务提交给当前线程,如果不成功,则使用例外的处理方法,关于底层实现较为复杂,和我们使用Fork/Join关系也不太大,如果希望搞明白具体原理,可以看源码。

下面看一下join的流程:

public final V join() {
   int s;
   if ((s = doJoin() & DONE_MASK) != NORMAL)
     reportException(s);
   return getRawResult();
 }
  
   private int doJoin() {
   int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
   return (s = status) < 0 ? s :
     ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
     (w = (wt = (ForkJoinWorkerThread)t).workQueue).
     tryUnpush(this) && (s = doExec()) < 0 ? s :
     wt.pool.awaitJoin(w, this, 0L) :
     externalAwaitDone();
 }
  
   final int doExec() {
   int s; boolean completed;
   if ((s = status) >= 0) {
     try {
       completed = exec();
     } catch (Throwable rex) {
       return setExceptionalCompletion(rex);
     }
     if (completed)
       s = setCompletion(NORMAL);
   }
   return s;
 }
  
  /**
  * Implements execution conventions for RecursiveTask.
  */
 protected final boolean exec() {
   result = compute();
   return true;
 }

上面展示了主要的调用链路,我们发现最后落到了我们在代码里编写的compute方法,也就是执行它,所以,我们需要知道的一点是,fork仅仅是分割任务,只有当我们执行join的时候,我们的额任务才会被执行。

如何使用Fork/Join并行框架

前文首先展示了一个求数组中最大值得例子,然后介绍了“工作窃取算法”,然后分析了Fork/Join框架的一些细节,下面才是我们最关心的,怎么使用Fork/Join框架呢?

为了使用Fork/Join框架,我们只需要继承类RecursiveTask或者RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景。

https://mp.weixin.qq.com/s/vkvYJnKfQyuUeD_BDQy_1g

获取面试经等学习资料,可以扫描下方二维码

原文地址:https://www.cnblogs.com/lemonrel/p/11699728.html

时间: 2024-10-10 02:43:26

浅谈Java Fork/Join并行框架的相关文章

浅谈java类集框架和数据结构(2)

继续上一篇浅谈java类集框架和数据结构(1)的内容 上一篇博文简介了java类集框架几大常见集合框架,这一篇博文主要分析一些接口特性以及性能优化. 一:List接口 List是最常见的数据结构了,主要有最重要的三种实现:ArrayList,Vector,LinkedList,三种List均来自AbstracList的实现,而AbstracList直接实现了List接口,并拓展自AbstractCollection. 在三种实现中,ArrayList和Vector使用了数组实现,可以认为这两个是

浅谈 Java 主流开源类库解析 XML

在大型项目编码推进中,涉及到 XML 解析问题时,大多数程序员都不太会选用底层的解析方式直接编码. 主要存在编码复杂性.难扩展.难复用....,但如果你是 super 程序员或是一个人的项目,也不妨一试. Jdom/Dom4j/Xstream... 基于底层解析方式重新组织封装的开源类库,简洁明了的 API,稳定高效的运行表现. Dom4j 基于 JAXP 解析方式,性能优异.功能强大.极易使用的优秀框架.想了解底层解析方式请翻看:浅谈 Java XML 底层解析方式 Jdom 你细看内部代码,

Java fork/join framework小试兼RecursiveAction类的应用

假设要做这么一件事:给你一个double类型数组,让你求这个数组的元素的倒数和,怎么做?当然是先求倒数,然后再求和啦.没错,就是这样,但是如果你有多个cpu core呢?比如四个core, 线性地这么去求是不是有些浪费计算资源,或者说没有充分利用多核的条件以降低执行时间呢?那可不可以把这这个数组拆分成4段,每个核去计算一段的倒数和,然后等都计算完成了,再把结果相加呢?当然可以!就应该这样做,那java中怎么做呢?用fork/join框架就可以了,使用的方法之一是利用RecursiveAction

!! 浅谈Java学习方法和后期面试技巧

浅谈Java学习方法和后期面试技巧 昨天查看3303回复33 部落用户大酋长 下面简单列举一下大家学习java的一个系统知识点的一些介绍 一.java基础部分:java基础的时候,有些知识点是非常重要的,比如循环系列.For,while,do-while.这方面只要大家用心点基本没什么难点. 二.面向对象:oop面向对象的时候,偏重理论,相信这方面的文章也很多,大家可以多看看,在这就不说了.重点掌握面向对象的三大特征和基本原理. 三.java核心一:这方面主要偏重API,所以在学习了这章的时候,

浅谈 Java Printing

浅谈 Java  Printing 其实怎么说呢?在写这篇博文之前,我对java printing 可以说是一无所知的.以至于我在敲文字时, 基本上是看着api文档翻译过来的.这虽然看起来非常的吃力,但是我相信,有道大哥不会辜负我的.嘻 嘻! Java Printing 技术,也就是我们平时所接触的打印,只不过是说可以用Java实现而已. 一.Java Printing 打印简介 Java Printing API能够使java应用程序实现相关的打印功能,如: 1.打印所有 Java 2D 和

【转】浅谈Java中的equals和==

浅谈Java中的equals和== 在初学Java时,可能会经常碰到下面的代码: 1 String str1 = new String("hello"); 2 String str2 = new String("hello"); 3 4 System.out.println(str1==str2); 5 System.out.println(str1.equals(str2)); 为什么第4行和第5行的输出结果不一样?==和equals方法之间的区别是什么?如果在初

浅谈java异常[Exception]

本文转自:focusJ 一. 异常的定义 在<java编程思想>中这样定义 异常:阻止当前方法或作用域继续执行的问题.虽然java中有异常处理机制,但是要明确一点,决不应该用"正常"的态度来看待异常.绝对一点说异常就是某种意义上的错误,就是问题,它可能会导致程序失败.之所以java要提出异常处理机制,就是要告诉开发人员,你的程序出现了不正常的情况,请注意. 记得当初学习java的时候,异常总是搞不太清楚,不知道这个异常是什么意思,为什么会有这个机制?但是随着知识的积累逐渐也

浅谈Java虚拟机

最近发现MDT推出去的系统的有不同问题,其问题就不说了,主要是策略权限被域继承了.比如我们手动安装的很多东东都是未配置壮态,推的就默认为安全壮态了,今天细找了一下,原来把这个关了就可以了. 浅谈Java虚拟机,布布扣,bubuko.com

浅谈Java中的对象和引用

浅谈Java中的对象和对象引用 在Java中,有一组名词经常一起出现,它们就是"对象和对象引用",很多朋友在初学Java的时候可能经常会混淆这2个概念,觉得它们是一回事,事实上则不然.今天我们就来一起了解一下对象和对象引用之间的区别和联系. 1.何谓对象? 在Java中有一句比较流行的话,叫做"万物皆对象",这是Java语言设计之初的理念之一.要理解什么是对象,需要跟类一起结合起来理解.下面这段话引自<Java编程思想>中的一段原话: "按照通