FutureTask与Fork/Join

在学习多线程的过程中,我们形成了一种思维习惯。那就是对于某个耗时操作不再做同步操作,让他分裂成一个线程之后执行下一步,而线程执行耗时操作。并且我们希望在我们需要它返回的时候再去调用它的结果集。好比我们把米饭和水放进了电饭煲,转头就去炒菜了,等到菜完成之后,转头去查看饭是否完成。多线程造成了并行计算的现象,有时候它们是真的多核计算而有时候只是单核的切换。

FutureTask表示的是一种,异步操作的典范。我提交了任务,在未来我要拿到结果。

考虑一种简单的场景,A问B一个问题,B一时回答不了,B要去考虑一段时间,等到有结果了,再告诉A。

这时,我们需要类A,类B。

package Future;

//调用端
public class CallA implements CallBack{
    private CallB b;
    public CallA(CallB b){
        this.b = b;
    }

    public void submitProblem(String problem){
        System.out.println("a 提交问题:"+problem);
        new Thread(){
            public void run(){
                b.execute(CallA.this, problem);
            }
        }.start();
        System.out.println("a 提交问题完毕");
    }

    @Override
    public void result(String result) {
        System.out.println(result);
    }

}
package Future;

//执行处理
public class CallB {
	public void execute(CallBack callBack,String problem){
		System.out.println("接受问题:"+problem);
		System.out.println("开始处理");
		try{
			Thread.sleep(2000);
		}catch (Exception e) {
			e.printStackTrace();
		}
		callBack.result("问题处理结果:abcdefg...");
	}
}

  类的结构是,A中保留它作用对象B的一个引用,在触发询问问题的时候,A向B提交了一个方法调用,并且同时开启了一个线程,这是它不阻塞的原因。

  为“提问题”做一个面向对象的接口。

//回调接口
public interface CallBack {
	public void result(String result);
}	

  他们执行的主流程,十分简单。

public class Main {
    public static void main(String[] args) {
        CallB b = new CallB();
        CallA a = new CallA(b);
        a.submitProblem("英文字母");
    }
}

  熟悉了这个过程,JDK提供了FutureTask的接口。

package Future;

import java.util.concurrent.Callable;

public class RealData implements Callable<String>{
    private String data;
    public RealData(String data){
        this.data = data;
    }
    @Override
    public String call() throws Exception {
        StringBuffer sb = new StringBuffer();
        for(int i=0;i<10;i++){
            sb.append(data);
            try{
                Thread.sleep(1500);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        return sb.toString();
    }

}

  实现这个Callable接口,重写call方法,在未来调用get的时候将返回运算结果。

package Future;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

//jdk future框架
public class FutureMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> future = new FutureTask<String>(new RealData("a"));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.submit(future);
        System.out.println("请求完毕");
        try{
            Thread.sleep(1000);
        }catch (Exception e) {
        }
        System.out.println("future task 返回:"+future.get());
    }
}

多线程的优势体现在并行计算中,虽然某大佬说研究并行计算是在浪费时间,但是作为一种由多线程产生的技术来说,先了解一下特点。

JDK为我们提供了一套Join/Fork框架,考虑下面这个例子。

package ForkAndJoin;

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction{
    private final int Max = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end){
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        if((end - start)<Max){
            for(int i=start;i<end;i++){
                System.out.println("当前线程:"+Thread.currentThread().getName()+" i :"+i);
            }
        }else{
            int middle = (start+end)/2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            left.fork();
            right.fork();
        }
    }

}
package ForkAndJoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolTest {
    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool forkJoin = new ForkJoinPool();
        forkJoin.submit(new PrintTask(0,200));
        forkJoin.awaitTermination(2, TimeUnit.SECONDS);
        forkJoin.shutdown();
    }
}

在compute方法中写主要的任务处理,这是一个并行计算的小例子。

J/F的模式很像map-reduce模式,将任务分小,然后各个处理。

  

时间: 2024-08-09 21:57:32

FutureTask与Fork/Join的相关文章

FutureTask、Fork/Join、 BlockingQueue

我们之前学习创建线程有Thread和Runnable两种方式,但是两种方式都无法获得执行的结果. 而Callable和Future在任务完成后得到结果. Future是一个接口,表示一个任务的周期,并提供了相应的方法来判断是否已经完成或者取消任务,以及获取任务的结果和取消任务. FutureTask可用于异步获取执行结果或取消执行任务的场景.通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的g

九 fork/join CompletableFuture

1: Fork/join fork/join:  fork是分叉的意思, join是合并的意思. Fork/Join框架:是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. Fokr/Join的适合场景:所处理的元素必须是独立的,数据集要足够大: 并且在并行加速方面,每个元素的处理成本要足够高,这样才能补偿建立fork/join框架所消耗的成本. 这个文章中的例子(http://www.infoq.com/cn/arti

Java并发编程从入门到精通 - 第7章:Fork/Join框架

1.综述:化繁为简,分而治之:递归的分解和合并,直到任务小到可以接受的程度:2.Future任务机制:  Future接口就是对于具体的Runnable或者Callable任务的执行结果进行取消.查询是否完成.获取结果:必要时可以通过get方法获取执行结果,该方法会阻塞直到任务会返回结果:也就是说Future接口提供三种功能:判断任务是否完成.能够中断任务.能够获取任务执行结果:  Future接口里面的常用方法:3.FutureTask:  FutureTask类是Future接口唯一的实现类

线程基础:多任务处理(13)——Fork/Join框架(解决排序问题)

============== 接上文< 线程基础:多任务处理(12)--Fork/Join框架(基本使用)> 3. 使用Fork/Join解决实际问题 之前文章讲解Fork/Join框架的基本使用时,所举的的例子是使用Fork/Join框架完成1-1000的整数累加.这个示例如果只是演示Fork/Join框架的使用,那还行,但这种例子和实际工作中所面对的问题还有一定差距.本篇文章我们使用Fork/Join框架解决一个实际问题,就是高效排序的问题. 3-1. 使用归并算法解决排序问题 排序问题是

Fork/Join框架介绍(转)

1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

Java Fork/Join框架

fork-join框架 fork操作的作用是把一个大的问题划分成若干个较小的问题.在这个划分过程一般是递归进行的.直到可以直接进行计算.需要恰当地选取子问题的大小.太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销.每个子问题计算完成后,可以得到关于整个问题的部分解.join操作的作用是把这些分解手机组织起来,得到完整解. 在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行.那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行.

使用Java7提供的Fork/Join框架

在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架.这个是对原来的Executors更 进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是.当一个线程正在等待他创建的 子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和 Executors这个方式最大的区别,更加有效的使用了线程的资源和功能.所以非常推荐使用Fork/Join框架. 下面我们以一个例子来说明这个框架如何

《java.util.concurrent 包源码阅读》24 Fork/Join框架之Work-Stealing

仔细看了Doug Lea的那篇文章:A Java Fork/Join Framework 中关于Work-Stealing的部分,下面列出该算法的要点(基本是原文的翻译): 1. 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列. 2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO. 3. 子任务会被加入到原先任务所在Worker线程的任务队列. 4. Worker线程用LIFO的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队