使用Java7提供的Fork/Join框架

在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架。这个是对原来的Executors更

进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是。当一个线程正在等待他创建的

子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和

Executors这个方式最大的区别,更加有效的使用了线程的资源和功能。所以非常推荐使用Fork/Join框架。

下面我们以一个例子来说明这个框架如何使用,主要就是创建一个含有10000个资源的List,分别去修改他的内容。

package com.bird.concursey.charpet8;

/**
 * store the name and price of a product
 * @author bird 2014年10月7日 下午11:23:14
 */
public class Product {

	private String name;
	private double price;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public double getPrice() {
		return price;
	}

	public void setPrice(double price) {
		this.price = price;
	}

}
package com.bird.concursey.charpet8;

import java.util.ArrayList;
import java.util.List;

/**
 * generate a list of random products
 * @author bird
 * 2014年10月7日 下午11:24:47
 */
public class ProductListGenerator {

	public List<Product> generate(int size) {
		List<Product> list = new ArrayList<Product>();
		for(int i = 0 ; i < size; i++) {
			Product product = new Product();
			product.setName("Product" + i);
			product.setPrice(10);
			list.add(product);
		}
		return list;
	}
}
package com.bird.concursey.charpet8;

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveAction {

	private static final long serialVersionUID = 1L;
	// These attributes will determine the block of products this task has to
	// process.
	private List<Product> products;
	private int first;
	private int last;
	// store the increment of the price of the products
	private double increment;

	public Task(List<Product> products, int first, int last, double increment) {
		super();
		this.products = products;
		this.first = first;
		this.last = last;
		this.increment = increment;
	}

	/**
	 * If the difference between the last and first attributes is greater than
	 * or equal to 10, create two new Task objects, one to process the first
	 * half of products and the other to process the second half and execute
	 * them in ForkJoinPool using the invokeAll() method.
	 */
	@Override
	protected void compute() {
		if (last - first < 10) {
			updatePrices();
		} else {
			int middle = (first + last) / 2;
			System.out.printf("Task: Pending tasks:%s\n", getQueuedTaskCount());
			Task t1 = new Task(products, first, middle + 1, increment);
			Task t2 = new Task(products, middle + 1, last, increment);
			invokeAll(t1, t2);
		}
	}

	private void updatePrices() {
		for (int i = first; i < last; i++) {
			Product product = products.get(i);
			product.setPrice(product.getPrice() * (1 + increment));
		}
	}

	public static void main(String[] args) {
		ProductListGenerator productListGenerator = new ProductListGenerator();
		List<Product> products = productListGenerator.generate(10000);
		Task task = new Task(products, 0, products.size(), 0.2);

		ForkJoinPool pool = new ForkJoinPool();
		pool.execute(task);

		do {
			System.out.printf("Main: Thread Count: %d\n",
					pool.getActiveThreadCount());
			System.out.printf("Main: Thread Steal: %d\n", pool.getStealCount());
			System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
			try {
				TimeUnit.MILLISECONDS.sleep(5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} while (!task.isDone());

		pool.shutdown();

		if(task.isCompletedNormally()) {
			System.out.printf("Main: The process has completed normally.\n");
		}

		for(Product product : products) {
			if(product.getPrice() != 12) {
				System.out.printf("Product %s: %f\n",product.getName(),product.getPrice());
			}
		}

		System.out.println("Main: End of the program.\n");
	}

}

In this example, you have created a ForkJoinPool object and a subclass of the

ForkJoinTask class that you execute in the pool. To create the ForkJoinPool object,

you have used the constructor without arguments, so it will be executed with its default

configuration. It creates a pool with a number of threads equal to the number of processors

of the computer. When the ForkJoinPool object is created, those threads are created and

they wait in the pool until some tasks arrive for their execution.

Since the Task class doesn‘t return a result, it extends the RecursiveAction class. In the

recipe, you have used the recommended structure for the implementation of the task. If the

task has to update more than 10 products, it divides those set of elements into two blocks,

creates two tasks, and assigns a block to each task. You have used the first and last

attributes in the Task class to know the range of positions that this task has to update in the

list of products. You have used the first and last attributes to use only one copy of the

products list and not create different lists for each task.

To execute the subtasks that a task creates, it calls the invokeAll() method. This is a

synchronous call, and the task waits for the finalization of the subtasks before continuing

(potentially finishing) its execution. While the task is waiting for its subtasks, the worker thread

that was executing it takes another task that was waiting for execution and executes it. With

this behavior, the Fork/Join framework offers a more efficient task management than the

Runnable and Callable objects themselves.

The invokeAll() method of the ForkJoinTask class is one of the main differences

between the Executor and the Fork/Join framework. In the Executor framework, all the tasks

have to be sent to the executor, while in this case, the tasks include methods to execute and

control the tasks inside the pool. You have used the invokeAll() method in the Task class,

that extends the RecursiveAction class that extends the ForkJoinTask class.

You have sent a unique task to the pool to update all the list of products using the execute()

method. In this case, it‘s an asynchronous call, and the main thread continues its execution.

You have used some methods of the ForkJoinPool class to check the status and the

evolution of the tasks that are running. The class includes more methods that can be useful

for this purpose. See the Monitoring a Fork/Join pool recipe for a complete list of

those methods.

Finally, like with the Executor framework, you should finish ForkJoinPool using the

shutdown() method.

时间: 2024-10-05 03:14:11

使用Java7提供的Fork/Join框架的相关文章

使用Java7提供Fork/Join框架

在Java7在.JDK它提供了多线程开发提供了一个非常强大的框架.这是Fork/Join框架.这是原来的Executors更多 进一步,在原来的基础上添加了并行分治计算中的一种Work-stealing策略.就是指的是. 当一个线程正在等待他创建的 子线程执行的时候,当前线程假设完毕了自己的任务后,就会寻找还没有被执行的任务而且执行他们,这样就是和 Executors这个方式最大的差别,更加有效的使用了线程的资源和功能.所以很推荐使用Fork/Join框架. 以下我们以一个样例来说明这个框架怎样

Fork/Join框架介绍(转)

1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果

聊聊并发(八)——Fork/Join框架介绍

作者 方腾飞 发布于 2013年12月23日 | 被首富的“一个亿”刷屏?不如定个小目标,先把握住QCon上海的优惠吧!2 讨论 分享到:微博微信FacebookTwitter有道云笔记邮件分享 稍后阅读 我的阅读清单 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大

第五章 Fork/Join框架

Java 7 并发编程实战手册目录 代码下载(https://github.com/Wang-Jun-Chao/java-concurrency) 第五章 Fork/Join框架 5.1简介 通常,使用Java来开发一个简单的并发应用程序时,会创建一些Runnable对象,然后创建对应的Thread对象来控制程序中这些线程的创建.执行以及线程的状态.自从Java 5开始引入了 Executor和ExecutorService接口以及实现这两个接口的类(比如ThreadPoolExecutor)之

Java并发编程--Fork/Join框架使用

上篇博客我们介绍了通过CyclicBarrier使线程同步,但是上述方法存在一个问题,那就是如果一个大任务跑了2个线程去完成,如果线程2耗时比线程1多2倍,线程1完成后必须等待线程2完成,等待的过程线程1没法复用.现在我们准备解决这个问题,我们希望线程1完成自己的任务后能去帮助线程2完成一部分任务.Java7引如了Fork/Join框架可以很好的解决这个问题. Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果后得到大任务结果的框架.fork

Fork/Join框架介绍

转http://www.infoq.com/cn/articles/fork-join-introduction/ 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算

Fork/Join 框架框架使用

1.介绍 Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.在多核计算机中正确使用可以很好的发挥cpu的作用,提高程序的执行效率.框架采用工作窃取算法,当有子任务线程处理完当前任务时,它会从其他线程执行的任务队列里窃取任务来执行,从而提高整体的执行效率.为了减少线程间的任务资源竞争,队列通常使用双端队列,别窃取任务线程永远从啥UN广大UN队列的呕吐不获取任务执行,而窃取任务的线程永远从双端

Java 并发编程 -- Fork/Join 框架

概述 Fork/Join 框架是 Java7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.下图是网上流传的 Fork Join 的运行流程图,直接拿过来用了: 工作窃取算法 工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行.那么为什么要使用这个算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创

Java Fork/Join框架

fork-join框架 fork操作的作用是把一个大的问题划分成若干个较小的问题.在这个划分过程一般是递归进行的.直到可以直接进行计算.需要恰当地选取子问题的大小.太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销.每个子问题计算完成后,可以得到关于整个问题的部分解.join操作的作用是把这些分解手机组织起来,得到完整解. 在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行.那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行.