Java并发编程--Fork/Join框架使用

上篇博客我们介绍了通过CyclicBarrier使线程同步,但是上述方法存在一个问题,那就是如果一个大任务跑了2个线程去完成,如果线程2耗时比线程1多2倍,线程1完成后必须等待线程2完成,等待的过程线程1没法复用。现在我们准备解决这个问题,我们希望线程1完成自己的任务后能去帮助线程2完成一部分任务。Java7引如了Fork/Join框架可以很好的解决这个问题。

Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果后得到大任务结果的框架。fork是分叉,join是结合。下面看张图来清晰的认识一下:

其实Fork/Join本质上是分治算法的一种实现。下面我们来看怎么具体使用:

ForkJoinTask:我们要使用Fork/Join框架,必须首先创建一个Fork/Join任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。

RecursiveTask :用于有返回结果的任务。

ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

下面,我们同样实现考试系统抽题的例子。

GetQuestionsTask.java

public class GetQuestionsTask extends RecursiveTask<List> {

	//参数map
	private Map map;
	//参数list==只放题型
	private List questionTypeList;

	public GetQuestionsTask(List questionTypeList,Map map) {
		this.questionTypeList = questionTypeList;
		this.map=map;
	}

	@Override
	protected List compute() {
		System.out.println(questionTypeList.size());
		List list = new ArrayList();
		if (questionTypeList.size() < 2) {
			// 抽题
			list = getQuestions(questionTypeList,map);
		} else {
			int size = questionTypeList.size();
			int mid = size / 2;
			GetQuestionsTask task1 = new GetQuestionsTask(
					questionTypeList.subList(0, mid),map);
			GetQuestionsTask task2 = new GetQuestionsTask(
					questionTypeList.subList(mid, size),map);
			invokeAll(task1, task2);
			try {
				list = groupResults(task1.get(), task2.get());
			} catch (InterruptedException e) {

				e.printStackTrace();
			} catch (ExecutionException e) {

				e.printStackTrace();
			}
		}

		return list;
	}

	//合并抽题结果
	private List groupResults(List list1, List list2) {
		System.out.println(Thread.currentThread().getName()+"开始合并结果......");
		// 合并返回结果
		List list = new ArrayList();
		list.addAll(list1);
		list.addAll(list2);
		System.out.println(Thread.currentThread().getName()+"合并结果结束......");
		return list;
	}

	// 抽题
	private List getQuestions(List questTypeList,Map map) {
		List list = new ArrayList();
		for(int i=0;i<questTypeList.size();i++){
			System.out.println(Thread.currentThread().getName()+"开始抽题......"+questionTypeList.get(i).toString());
			//假数据,向list中放试题
			list.add("0");
			list.add("1");
			list.add("2");
			list.add("3");
			list.add("4");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {

				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()+"抽题结束..."+questionTypeList.get(i).toString());
		}
		return list;
	}

}

客户端:

//该池的线程数量不会超过0*7fff个(32767)
		//池中维护着ForkJoinWorkerThread对象数组,数组大小由parallelism属性决定,parallelism默认为处理器个数
		ForkJoinPool pool = new ForkJoinPool();
		GetQuestionsTask task = new GetQuestionsTask(questionTypeList, map);
		pool.execute(task);
		// 试题列表=task.get()
		try {
			List finalList = task.get();
			System.out.println("最终结果个数:" + finalList.size());
		} catch (InterruptedException e) {

			e.printStackTrace();
		} catch (ExecutionException e) {

			e.printStackTrace();
		}

总结:

Fork/Join实现了“工作窃取算法”,当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。当然,fork/join框架的使用有一定的约束条件:

1.除了fork()  和  join()方法外,线程不得使用其他的同步工具。线程最好也不要sleep()

2.线程不得进行I/O操作

3.线程不得抛出checked exception。

时间: 2024-12-15 06:54:13

Java并发编程--Fork/Join框架使用的相关文章

Java 并发编程 -- Fork/Join 框架

概述 Fork/Join 框架是 Java7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.下图是网上流传的 Fork Join 的运行流程图,直接拿过来用了: 工作窃取算法 工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行.那么为什么要使用这个算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创

【fork/join】java并发编程-fork/join示例

package com.chinamobile.epic.tako.common.graphite.query.sync.impl; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Recursiv

JAVA中的Fork/Join框架

看了下Java Tutorials中的fork/join章节,整理下. 什么是fork/join框架 fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能:设计的目的是为了处理那些可以被递归拆分的任务. fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃

Java多线程之~~~Fork/Join框架的同步和异步

在Fork/Join框架中,提交任务的时候,有同步和异步两种方式.以前使用的invokeAll()的方法是同步的,也就是任 务提交后,这个方法不会返回直到所有的任务都处理完了.而还有另一种方式,就是使用fork方法,这个是异步的.也 就是你提交任务后,fork方法立即返回,可以继续下面的任务.这个线程也会继续运行. 下面我们以一个查询磁盘的以log结尾的文件的程序例子来说明异步的用法. package com.bird.concursey.charpet8; import java.io.Fil

Java并发编程之---Lock框架详解

Java 并发开发:Lock 框架详解 摘要: 我们已经知道,synchronized 是Java的关键字,是Java的内置特性,在JVM层面实现了对临界资源的同步互斥访问,但 synchronized 粒度有些大,在处理实际问题时存在诸多局限性,比如响应中断等.Lock 提供了比 synchronized更广泛的锁操作,它能以更优雅的方式处理线程同步问题.本文以synchronized与Lock的对比为切入点,对Java中的Lock框架的枝干部分进行了详细介绍,最后给出了锁的一些相关概念. 一

JAVA并发编程J.U.C学习总结

前言 学习了一段时间J.U.C,打算做个小结,个人感觉总结还是非常重要,要不然总感觉知识点零零散散的. 有错误也欢迎指正,大家共同进步: 另外,转载请注明链接,写篇文章不容易啊,http://www.cnblogs.com/chenpi/p/5614290.html 本文目录如下,基本上涵盖了J.U.C的主要内容: JSR 166及J.U.C Executor框架(线程池. Callable .Future) AbstractQueuedSynchronizer(AQS框架) Locks & C

Java并发编程从入门到精通 - 第7章:Fork/Join框架

1.综述:化繁为简,分而治之:递归的分解和合并,直到任务小到可以接受的程度:2.Future任务机制:  Future接口就是对于具体的Runnable或者Callable任务的执行结果进行取消.查询是否完成.获取结果:必要时可以通过get方法获取执行结果,该方法会阻塞直到任务会返回结果:也就是说Future接口提供三种功能:判断任务是否完成.能够中断任务.能够获取任务执行结果:  Future接口里面的常用方法:3.FutureTask:  FutureTask类是Future接口唯一的实现类

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

聊聊并发(八)——Fork/Join框架介绍

作者 方腾飞 发布于 2013年12月23日 | 被首富的“一个亿”刷屏?不如定个小目标,先把握住QCon上海的优惠吧!2 讨论 分享到:微博微信FacebookTwitter有道云笔记邮件分享 稍后阅读 我的阅读清单 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大