【Java多线程】Java的MapReduce框架ForkJoin

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork就是把一个大任务切分为若干子任务并行的执行。类似MapReduce里面的Map。

Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。类似MapReduce里面的Reduce。

举例说明,统计1~100的和,并行运行, 每个线程计算20个数的,如果当前线程统计的数量多于20,就切分为两个线程运行,切分点为中间数,至少分配的每一个线程的统计数小于或等于20,这个分裂任务的过程就叫做Fork。最后各个线程向上汇报汇总结果,这个汇聚结果的过程就叫做Join。

流程图如下:

代码如下:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;

class Calculator extends RecursiveTask<Integer> 
{  
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    /**
     * 线程计数器
     */
    public static AtomicInteger tcounter = new AtomicInteger(0);
    /**
     * 计算阀值
     */
    private static final int THRESHOLD = 20;  
    /**
     * 开始值
     */
    private int start;
    /**
     * 结束值
     */
    private int end;  
  
    public Calculator(int start, int end) 
    {  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Integer compute() 
    { 
    	tcounter.incrementAndGet();
    	System.out.println("start:" + start + ", end:" + end);
    
        int sum = 0;  
        if((end - start) <= THRESHOLD)
        {  
        	/**
        	 * 小于等于阀值,直接计算
        	 */
            for(int i = start; i<= end; i++)
            {  
                sum += i;  
            }
        }
        else
        {  
        	/**
        	 * 大于阀值,任务分解, 并汇聚结果
        	 */
            int middle = (start + end) / 2;  
            Calculator left = new Calculator(start, middle);  
            Calculator right = new Calculator(middle + 1, end);  

            left.fork();
            right.fork();
           
            sum = left.join() + right.join();  
        } 
        
        return sum;  
    }  
  
}  

public class TestForkJoinPool 
{
	public static void main(String[] args) throws InterruptedException, ExecutionException 
	{
	    ForkJoinPool forkJoinPool = new ForkJoinPool();  
	    Future<Integer> result = forkJoinPool.submit(new Calculator(1, 100));  
	    System.out.println("结果: " + result.get() + ", " + Calculator.tcounter + "个线程参与了运算");
	}
}

执行结果

start:1, end:100
start:1, end:50
start:51, end:100
start:1, end:25
start:1, end:13
start:26, end:50
start:14, end:25
start:51, end:75
start:26, end:38
start:39, end:50
start:51, end:63
start:64, end:75
start:76, end:100
start:76, end:88
start:89, end:100
结果: 5050, 15个线程参与了运算

聊聊核心类,

ForkJoinPool:负责建立一个ForkJoin运行环境

ForkJoinTask: 表示实际运行的任务,一般使用它的两个子类,一个是RecursiveAction(任务不带返回值时使用),另一个是RecursiveTask(任务带返回值时使用)。按照情况选择任意一个, 使用时需要继承该子类,然后实现抽象方法compute【任务的逻辑就在compute方法里编写】。

ForkJoinTask.fork方法表示分裂任务, ForkJoinTask.join表示汇聚分裂任务的compute方法的执行结果。

时间: 2024-11-12 14:54:09

【Java多线程】Java的MapReduce框架ForkJoin的相关文章

Java多线程-Java多线程概述

第一章 Java多线程概述 线程的启动 线程的暂停 线程的优先级 线程安全相关问题 1.1 进程与线程 进程:可以将运行在内存中的程序(如exe文件)理解为进程,进程是受操作系统管理的基本的运行单元. 线程:可以理解为进程中独立运行的子任务.如果QQ.exe运行时的好友视频线程.下载文件线程.数据传输线程.发送消息线程等. 使用多线程可以更好的利用计算机的资源如CPU.线程被调用的时机是随机的. 1.2 Java多线程实现方式 1.2.1 继承Thread类 public class Threa

Java多线程之~~~Fork/Join框架的同步和异步

在Fork/Join框架中,提交任务的时候,有同步和异步两种方式.以前使用的invokeAll()的方法是同步的,也就是任 务提交后,这个方法不会返回直到所有的任务都处理完了.而还有另一种方式,就是使用fork方法,这个是异步的.也 就是你提交任务后,fork方法立即返回,可以继续下面的任务.这个线程也会继续运行. 下面我们以一个查询磁盘的以log结尾的文件的程序例子来说明异步的用法. package com.bird.concursey.charpet8; import java.io.Fil

java多线程 -- ForkJoinPool 分支/ 合并框架 工作窃取

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总. Fork/Join 框架与线程池的区别 采用 "工作窃取"模式(work-stealing):当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中. 相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般

Java多线程-Java内存模型

以下内容转自http://ifeve.com/java-memory-model-6/: Java内存模型规范了Java虚拟机与计算机内存是如何协同工作的.Java虚拟机是一个完整的计算机的一个模型,因此这个模型自然也包含一个内存模型——又称为Java内存模型. 如果你想设计表现良好的并发程序,理解Java内存模型是非常重要的.Java内存模型规定了如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步的访问共享变量. 原始的Java内存模型存在一些不足,因此Java内存模型在

Java多线程-Java同步块

以下内容转自http://ifeve.com/synchronized-blocks/: Java 同步块(synchronized block)用来标记方法或者代码块是同步的.Java同步块用来避免竞争.本文介绍以下内容: Java同步关键字(synchronzied) 实例方法同步 静态方法同步 实例方法中同步块 静态方法中同步块 Java同步示例 Java 同步关键字(synchronized) Java中的同步块用synchronized标记.同步块在Java中是同步在某个对象上.所有同

java多线程---------java.util.concurrent并发包----------ThreadPoolExecutor

ThreadPoolExecutor线程池 一.三个构造方法 ThreadPoolExecutor(int corePoolSize,int MaxmumPoolSize,long KeepAliveTime,,TimeUnit  unit,BolokingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize,int MaxmumPoolSize,long KeepAliveTime,,TimeUnit  unit,Bol

java多线程与内存可见性

一.java多线程 JAVA多线程实现的三种方式: http://blog.csdn.net/aboy123/article/details/38307539 二.内存可见性 1.什么是JAVA 内存模型 共享变量 :如果一个变量在多个线程的工作内存中都存在副本,那么这个变量就是这几个线程的共享变量. Java Memory Model (JAVA 内存模型)描述线程之间如何通过内存(memory)来进行交互,描述了java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存

一、Java多线程基础

一.简介 1.操作系统 在早起的裸机时代,计算机非常地昂贵,而且也没有操作系统的概念,计算机从头到尾只能执行一个程序.如果程序在执行一个耗时的操作,那么在这个过程中,计算机就有大量的资源闲置在那里,这是非常浪费的. 而这个时候,操作系统的概念被提出了.在操作系统的控制下,一个计算机可以执行很多的程序.计算机的资源由操作系统进行分配,程序之间获得计算机资源并执行各自的任务,相互独立.操作系统的出现使得计算机资源的利用率大大增加. 你也可以将操作系统理解为,运行程序的程序.类比AI是一种产生算法的算

【Java多线程】两种基本实现框架

Java多线程学习1——两种基本实现框架 一.前言 当一个Java程序启动的时候,一个线程就立刻启动,改程序通常也被我们称作程序的主线程.其他所有的子线程都是由主线程产生的.主线程是程序开始就执行的,并且程序最终是以主线程的结束而结束的. Java编写程序都运行在在Java虚拟机(JVM)中,在JVM的内部,程序的多任务是通过线程来实现的.每用Java命令启动一个Java应用程序,就会启动一个JVM进程.在同一个JVM进程中,有且只有一个进程,就是它自己.在这个JVM环境中,所有程序代码的运行都