ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析

ExecutorService是JDK并发工具包提供的一个核心接口,相当于一个线程池,提供执行任务和管理生命周期的方法。ExecutorService接口中的大部分API都是比较容易上手使用的,本文主要介绍下invokeAll和invokeAll方法的特性和使用。

package tasks;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class SleepSecondsCallable implements Callable<String>
{
	private String name;

	private int seconds;

	public SleepSecondsCallable(String name, int seconds)
	{
		this.name = name;
		this.seconds = seconds;
	}

	public String call() throws Exception
	{
		System.out.println(name + ",begin to execute");

		try
		{
			TimeUnit.SECONDS.sleep(seconds);
		} catch (InterruptedException e)
		{
			System.out.println(name + " was disturbed during sleeping.");
			e.printStackTrace();
			return name + "_SleepSecondsCallable_failed";
		}

		System.out.println(name + ",success to execute");

		return name + "_SleepSecondsCallable_succes";
	}

}

这是一个通过睡眠来模拟的耗时任务,该任务是可中断/可终止的任务,能够响应中断请求。

package tasks;

import java.util.concurrent.Callable;

public class ExceptionCallable implements Callable<String>
{

	private String name = null;

	public ExceptionCallable()
	{

	}

	public ExceptionCallable(String name)
	{
		this.name = name;
	}

	@Override
	public String call() throws Exception
	{
		System.out.println("begin to ExceptionCallable.");

		System.out.println(name.length());

		System.out.println("end to ExceptionCallable.");

		return name;
	}

}

这是一个可能会在执行过程中,抛出空指针异常的任务。

package tasks;

import java.util.Random;
import java.util.concurrent.Callable;

public class RandomTenCharsTask implements Callable<String>
{

	@Override
	public String call() throws Exception
	{
		System.out.println("RandomTenCharsTask begin to execute...");

		StringBuffer content = new StringBuffer();

		String base = "abcdefghijklmnopqrstuvwxyz0123456789";

		Random random = new Random();

		for (int i = 0; i < 10; i++)
		{
			int number = random.nextInt(base.length());
			content.append(base.charAt(number));
		}

		System.out.println("RandomTenCharsTask complete.result=" + content);
		return content.toString();
	}

}

这是一个正常的短时的任务,产生10个随机字符组成的字符串。

一、测试invokeAny()

/**
 * 提交的任务集合,一旦有1个任务正常完成(没有抛出异常),会终止其他未完成的任务
 */
public static void invokeAny1() throws Exception
{
	ExecutorService executorService = Executors.newFixedThreadPool(3);

	List<Callable<String>> tasks = new ArrayList<Callable<String>>();

	tasks.add(new SleepSecondsCallable("t1", 2));
	tasks.add(new SleepSecondsCallable("t2", 1));

	String result = executorService.invokeAny(tasks);

	System.out.println("result=" + result);

	executorService.shutdown();
}

程序的执行结果是:返回t2线程的执行结果t2_SleepSecondsCallable_succes,同时t1抛出java.lang.InterruptedException: sleep interrupted。

也就说:一旦有1个任务正常完成(执行过程中没有抛异常),线程池会终止其他未完成的任务

第二种情况,向线程池提交3个异常任务ExceptionCallable

/**
* 没有1个正常完成的任务,invokeAny()方法抛出ExecutionException,封装了任务中元素的异常
*
*/
public static void invokeAny2() throws Exception
{
	ExecutorService executorService = Executors.newFixedThreadPool(3);

	List<Callable<String>> tasks = new ArrayList<Callable<String>>();

	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());

	String result = executorService.invokeAny(tasks);

	System.out.println("result=" + result);

	executorService.shutdown();
}

程序执行结果是:调用invokeAny()报错 java.util.concurrent.ExecutionException: java.lang.NullPointerException。

也就是说:如果提交的任务列表中,没有1个正常完成的任务,那么调用invokeAny会抛异常,究竟抛的是哪儿个任务的异常,无关紧要

第三种情况:先提交3个异常任务,再提交1个正常的耗时任务

/**
* 有异常的任务,有正常的任务,invokeAny()不会抛异常,返回最先正常完成的任务
*/
public static void invokeAny3() throws Exception
{
	ExecutorService executorService = Executors.newFixedThreadPool(3);

	List<Callable<String>> tasks = new ArrayList<Callable<String>>();

	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());

	tasks.add(new SleepSecondsCallable("t1", 2));

	String result = executorService.invokeAny(tasks);

	System.out.println("result=" + result);
	executorService.shutdown();
}

程序执行结果是:不会抛出任何异常,打印出t2任务的返回结果。也就是说:invokeAny()和任务的提交顺序无关,只是返回最早正常执行完成的任务

第四种情况,测试下使用限时版本的invokeAny(),主要功能与不限时版本的差别不大

/**
 * 还没有到超时之前,所以的任务都已经异常完成,抛出ExecutionException<br>
 * 如果超时前满,还没有没有完成的任务,抛TimeoutException
 */
public static void invokeAnyTimeout() throws Exception
{
	ExecutorService executorService = Executors.newFixedThreadPool(3);

	List<Callable<String>> tasks = new ArrayList<Callable<String>>();

	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());
	tasks.add(new ExceptionCallable());

	String result = executorService.invokeAny(tasks, 2, TimeUnit.SECONDS);

	System.out.println("result=" + result);

	executorService.shutdown();
}

程序执行结果是:抛出ExecutionException。这个其实很合理,也很好理解。如果在超时之前,所有任务已经都是异常终止,那就没有必要在等下去了;如果超时之后,仍然有正在运行或等待运行的任务,那么会抛出TimeoutException。

最后我们来看下,JDK源码中ExecutorService.invokeAny的方法签名和注释

/**
     * Executes the given tasks, returning the result
     * of one that has completed successfully (i.e., without throwing
     * an exception), if any do. Upon normal or exceptional return,
     * tasks that have not completed are cancelled.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @return the result returned by one of the tasks
     * @throws InterruptedException if interrupted while waiting
     * @throws NullPointerException if tasks or any of its elements
     *         are <tt>null</tt>
     * @throws IllegalArgumentException if tasks is empty
     * @throws ExecutionException if no task successfully completes
     * @throws RejectedExecutionException if tasks cannot be scheduled
     *         for execution
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

与我们测试结果一致,invokeAny()返回最先正常完成(without throwing exception)的任务直接结果;一旦有任务正常完成或者调用出现异常,线程池都会终止正在运行或等待运行(tasks that have not completed are cancelled)的任务。

二、测试invokeAll()

这个方法相对来说比较好理解,就是执行任务列表中的所有任务,并返回与每个任务对应的Futue。也就是说,任务彼此之间不会相互影响,可以通过future跟踪每一个任务的执行情况,比如是否被取消,是正常完成,还是异常完成,这主要使用Future类提供的API。

public static void testInvokeAll() throws Exception
{
	ExecutorService executorService = Executors.newFixedThreadPool(5);

	List<Callable<String>> tasks = new ArrayList<Callable<String>>();
	tasks.add(new SleepSecondsCallable("t1", 2));
	tasks.add(new SleepSecondsCallable("t2", 2));
	tasks.add(new RandomTenCharsTask());
	tasks.add(new ExceptionCallable());

	// 调用该方法的线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
	List<Future<String>> results = executorService.invokeAll(tasks);

	// 任务列表中所有任务执行完毕,才能执行该语句
	System.out.println("wait for the result." + results.size());

	executorService.shutdown();

	for (Future<String> f : results)
	{
		// isCanceled=false,isDone=true
		System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
				+ f.isDone());

		// ExceptionCallable任务会报ExecutionException
		System.out.println("task result=" + f.get());
	}
}

程序的执行结果和一些结论,已经直接写在代码注释里面了。invokeAll是一个阻塞方法,会等待任务列表中的所有任务都执行完成。不管任务是正常完成,还是异常终止,Future.isDone()始终返回true。通过Future.isCanceled()可以判断任务是否在执行的过程中被取消。通过Future.get()可以获取任务的返回结果,或者是任务在执行中抛出的异常。

第二种情况,测试限时版本的invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)

/**
 * 可以通过Future.isCanceled()判断任务是被取消,还是完成(正常/异常)<br>
 * Future.isDone()总是返回true,对于invokeAll()的调用者来说,没有啥用
 */
public static void testInvokeAllTimeout() throws Exception
{
	ExecutorService executorService = Executors.newFixedThreadPool(5);

	List<Callable<String>> tasks = new ArrayList<Callable<String>>();
	tasks.add(new SleepSecondsCallable("t1", 2));
	tasks.add(new SleepSecondsCallable("t2", 2));
	tasks.add(new SleepSecondsCallable("t3", 3));
	tasks.add(new RandomTenCharsTask());

	List<Future<String>> results = executorService.invokeAll(tasks, 1,
			TimeUnit.SECONDS);

	System.out.println("wait for the result." + results.size());

	for (Future<String> f : results)
	{
		System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
				+ f.isDone());
	}

	executorService.shutdown();

}
执行结果是: 

wait for the result.4
 isCanceled=true,isDone=true
 isCanceled=true,isDone=true
 isCanceled=true,isDone=true
 isCanceled=false,isDone=true

也就是说给定的超时期满,还没有完成的任务会被取消,即Future.isCancelled()返回true;在超时期之前,无论是正常完成还是异常终止的任务,Future.isCancelled()返回false。

第三种情况,测试在等待invokeAll执行完成之前,线程被中断

/**
 * 如果线程在等待invokeAll()执行完成的时候,被中断,会抛出InterruptedException<br>
 * 此时线程池会终止没有完成的任务,这主要是为了减少资源的浪费.
 */
public static void testInvokeAllWhenInterrupt() throws Exception
{
	final ExecutorService executorService = Executors.newFixedThreadPool(5);

	// 调用invokeAll的线程
	Thread invokeAllThread = new Thread() {

		@Override
		public void run()
		{
			List<Callable<String>> tasks = new ArrayList<Callable<String>>();
			tasks.add(new SleepSecondsCallable("t1", 2));
			tasks.add(new SleepSecondsCallable("t2", 2));
			tasks.add(new RandomTenCharsTask());

			// 调用线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
			try
			{
				List<Future<String>> results = executorService
						.invokeAll(tasks);
				System.out.println("wait for the result." + results.size());
			} catch (InterruptedException e)
			{
				System.out
						.println("I was wait,but my thread was interrupted.");
				e.printStackTrace();
			}

		}
	};

	invokeAllThread.start();

	Thread.sleep(200);

	invokeAllThread.interrupt();

	executorService.shutdown();

}

invokeAllThread 线程调用了ExecutorService.invokeAll(),在等待任务执行完成的时候,invokeAllThread被别的线程中断了。这个时候,

ExecutorService.invokeAll()会抛出Java.lang.InterruptedException,任务t1和t2都被终止抛出java.lang.InterruptedException: sleep interrupted。

也就是说一旦ExecutorService.invokeAll()方法产生了异常,线程池中还没有完成的任务会被取消执行

参见:http://blog.csdn.net/baidu_23086307/article/details/51740852

http://blog.csdn.net/lmj623565791/article/details/27250059

时间: 2024-10-12 17:15:20

ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析的相关文章

ExecutorService——invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks)

类似于InvokeAll方法,但是返回的条件不一样. invokeAny方法执行后,只要有一个任务完成(执行成功,没有抛出异常的那种).它就会返回这个任务的执行结果作为返回值. 而且,一旦invokeAny方法正常返回或者抛出异常,那些没有完成的任务将被取消. 有什么用途呢,如果其他任务都被取消了,不会影响吗? 答案就是,这些任务的目的都是一个,只要有一个完成就好了.比如:我想要让我同学早点脱单,于是我同时给他介绍了好几个对象(好几个任务),只要他跟其中一个好上了,其他女生就没有必要再勾搭了.

ExecutorService 的理解与使用

接口 Java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行.壹個 ExecutorService 实例因此特别像壹個线程池.事实上,在 java.util.concurrent 包中的 ExecutorService 的实现就是壹個线程池的实现. ExecutorService 样例 这里有壹個简单的使用Java 实现的 ExectorService 样例: [java] view plain copy ExecutorServ

Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable

一.Exectuor框架简介 Java从1.5版本开始,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架). Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等.他们的关系为 在Executor框架中,使用执行器(Exectuo

Android学习笔记之ExecutorService线程池的应用....

PS:转眼间就开学了...都不知道这个假期到底是怎么过去的.... 学习内容: ExecutorService线程池的应用... 1.如何创建线程池... 2.调用线程池的方法,获取线程执行完毕后的结果... 3.关闭线程...   首先我们先了解一下到底什么是线程池,只有了解了其中的道理,我们才能够进行应用...java.util.concurrent.ExecutorService表述了异步执行的机制   首先我们简单的举一个例子... package executor; import ja

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec

JDK8 并发包学习 Executor/ExecutorService(一)

本文介绍jdk8并发包中的Executor/ExecutorService这两个接口. Executor接口 概述 该类提供一个提交任务的方法,提交的任务可以在提交程序本线程运行,也可以在不同的线程运行,也可以在一个线程池中的线程运行,全看如何使用. However, the {@code Executor} interface does not strictly require that execution be asynchronous. In the simplest case, an e

Java线程池 ExecutorService了解一下

本篇主要涉及到的是java.util.concurrent包中的ExecutorService.ExecutorService就是Java中对线程池的实现. 一.ExecutorService介绍 ExecutorService是Java中对线程池定义的一个接口,它java.util.concurrent包中,在这个接口中定义了和后台任务执行相关的方法:  Java API对ExecutorService接口的实现有两个,所以这两个即是Java线程池具体实现类: 1. ThreadPoolExe

ExecutorService的十个使用技巧

ExecutorService] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent /ExecutorService.html)这个接口从Java 5开始就已经存在了.这得追溯到2004年了.这里小小地提醒一下,官方已经不再支持Java 5, Java 6了,Java 7[在半年后也将停止支持 .我之所以会提起ExecutorService这么旧的一个接口是因为,大多数Java程序员并没有搞清楚它的工作原理.关于它可以

ExecutorService线程池

ExecutorService 建立多线程的步骤: 1.定义线程类 class Handler implements Runnable{} 2.建立ExecutorService线程池 ExecutorService executorService = Executors.newCachedThreadPool(); 或者 int cpuNums = Runtime.getRuntime().availableProcessors();                //获取当前系统的CPU 数