Java多线程之~~~Fork/Join框架的同步和异步

在Fork/Join框架中,提交任务的时候,有同步和异步两种方式。以前使用的invokeAll()的方法是同步的,也就是任

务提交后,这个方法不会返回直到所有的任务都处理完了。而还有另一种方式,就是使用fork方法,这个是异步的。也

就是你提交任务后,fork方法立即返回,可以继续下面的任务。这个线程也会继续运行。

下面我们以一个查询磁盘的以log结尾的文件的程序例子来说明异步的用法。

package com.bird.concursey.charpet8;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class FolderProcessor extends RecursiveTask<List<String>> {

	private static final long serialVersionUID = 1L;

	private String path;
	private String extension;

	public FolderProcessor(String path, String extension) {
		super();
		this.path = path;
		this.extension = extension;
	}

	@Override
	protected List<String> compute() {
		List<String> list = new ArrayList<String>();
		List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
		File file = new File(path);
		File content[] = file.listFiles();
		if(content != null) {
			for(int i = 0; i < content.length; i++) {
				if(content[i].isDirectory()) {
					FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension);
					//异步方式提交任务
					task.fork();
					tasks.add(task);
				}else{
					if(checkFile(content[i].getName())) {
						list.add(content[i].getAbsolutePath());
					}
				}
			}
		}
		if(tasks.size() > 50) {
			System.out.printf("%s: %d tasks ran.\n",file.getAbsolutePath(),tasks.size());
		}

		addResultsFromTasks(list,tasks);
		return list;
	}

	/**
	 * that will add to the list of files
the results returned by the subtasks launched by this task.
	 * @param list
	 * @param tasks
	 */
	private void addResultsFromTasks(List<String> list,
			List<FolderProcessor> tasks) {
		for(FolderProcessor item: tasks) {
			list.addAll(item.join());
		}
	}

	/**
	 * This method compares if the name of a file
passed as a parameter ends with the extension you are looking for
	 * @param name
	 * @return
	 */
	private boolean checkFile(String name) {
		return name.endsWith(extension);
	}

	public static void main(String[] args) {
		ForkJoinPool pool = new ForkJoinPool();
		FolderProcessor system = new FolderProcessor("C:\\Windows", "log");
		FolderProcessor apps = new FolderProcessor("C:\\Program Files", "log");

		pool.execute(system);
		pool.execute(apps);

		pool.shutdown();

		List<String> results = null;
		results = system.join();
		System.out.printf("System: %d files found.\n",results.size());

		results = apps.join();
		System.out.printf("Apps: %d files found.\n",results.size());

	}
}

The key of this example is in the FolderProcessor class. Each task processes the content

of a folder. As you know, this content has the following two kinds of elements:

ff Files

ff Other folders

If the task finds a folder, it creates another Task object to process that folder and sends it to

the pool using the fork() method. This method sends the task to the pool that will execute it

if it has a free worker-thread or it can create a new one. The method returns immediately, so

the task can continue processing the content of the folder. For every file, a task compares its

extension with the one it‘s looking for and, if they are equal, adds the name of the file to the

list of results.

Once the task has processed all the content of the assigned folder, it waits for the finalization

of all the tasks it sent to the pool using the join() method. This method called in a task

waits for the finalization of its execution and returns the value returned by the compute()

method. The task groups the results of all the tasks it sent with its own results and returns

that list as a return value of the compute() method.

The ForkJoinPool class also allows the execution of tasks in an asynchronous way. You

have used the execute() method to send the three initial tasks to the pool. In the Main

class, you also finished the pool using the shutdown() method and wrote information about

the status and the evolution of the tasks that are running in it. The ForkJoinPool class

includes more methods that can be useful for this purpose. See the Monitoring a Fork/Join

pool recipe to see a complete list of those methods.

时间: 2024-10-20 06:57:19

Java多线程之~~~Fork/Join框架的同步和异步的相关文章

Java 并发编程 -- Fork/Join 框架

概述 Fork/Join 框架是 Java7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.下图是网上流传的 Fork Join 的运行流程图,直接拿过来用了: 工作窃取算法 工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行.那么为什么要使用这个算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创

Java并发编程--Fork/Join框架使用

上篇博客我们介绍了通过CyclicBarrier使线程同步,但是上述方法存在一个问题,那就是如果一个大任务跑了2个线程去完成,如果线程2耗时比线程1多2倍,线程1完成后必须等待线程2完成,等待的过程线程1没法复用.现在我们准备解决这个问题,我们希望线程1完成自己的任务后能去帮助线程2完成一部分任务.Java7引如了Fork/Join框架可以很好的解决这个问题. Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果后得到大任务结果的框架.fork

JAVA中的Fork/Join框架

看了下Java Tutorials中的fork/join章节,整理下. 什么是fork/join框架 fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能:设计的目的是为了处理那些可以被递归拆分的任务. fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃

《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果.这样就能使用多线程的方式来执行一个任务. JDK7引入的Fork/Join有三个核心类: ForkJoinPool,执行任务的线程池 ForkJoinWorkerThread,执行任务的工作线程 ForkJoinTask,一个用于ForkJoinPool的任务抽象类. 因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不

JAVA 1.7并发之Fork/Join框架

在之前的博文里有说过executor框架,其实Fork/Join就是继承executor的升级版啦 executor用于创建一个线程池,但是需要手动的添加任务,如果需要将大型任务分治,显然比较麻烦 而Fork/Join则是解决这个问题的计算框架 用户定义部分: 如何分治  (compute(1 , 10)) 定义一个函数用于分治,如果处理区间太大则分开,递归的调用自己 (compute(1 , 5) compute(6 , 10)) 如何计算  (compute(1 , 5))  如果区间合适则

Java并发编程从入门到精通 - 第7章:Fork/Join框架

1.综述:化繁为简,分而治之:递归的分解和合并,直到任务小到可以接受的程度:2.Future任务机制:  Future接口就是对于具体的Runnable或者Callable任务的执行结果进行取消.查询是否完成.获取结果:必要时可以通过get方法获取执行结果,该方法会阻塞直到任务会返回结果:也就是说Future接口提供三种功能:判断任务是否完成.能够中断任务.能够获取任务执行结果:  Future接口里面的常用方法:3.FutureTask:  FutureTask类是Future接口唯一的实现类

Java Fork/Join框架

fork-join框架 fork操作的作用是把一个大的问题划分成若干个较小的问题.在这个划分过程一般是递归进行的.直到可以直接进行计算.需要恰当地选取子问题的大小.太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销.每个子问题计算完成后,可以得到关于整个问题的部分解.join操作的作用是把这些分解手机组织起来,得到完整解. 在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行.那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行.

《java.util.concurrent 包源码阅读》26 Fork/Join框架之Join

接下来看看调用ForkJoinTask的join方法都发生了什么: public final V join() { // doJoin方法返回该任务的状态,状态值有三种: // NORMAL, CANCELLED和EXCEPTIONAL // join的等待过程在doJoin方法中进行 if (doJoin() != NORMAL) // reportResult方法针对任务的三种状态有三种处理方式: // NORMAL: 直接返回getRawResult()方法的返回值 // CANCELLE

使用Java7提供的Fork/Join框架

在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架.这个是对原来的Executors更 进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是.当一个线程正在等待他创建的 子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和 Executors这个方式最大的区别,更加有效的使用了线程的资源和功能.所以非常推荐使用Fork/Join框架. 下面我们以一个例子来说明这个框架如何