Java并发——Callable和Future

Callable和Future

Executor框架将工作单元划分为任务,即任务是逻辑上的工作单元,而线程是任务异步执行的机制。Runnable是任务的一个抽象,并且理想状态下任务是独立的执行,但是Runnable的run( )不能返回一个结果或者抛出一个受检查的异常,这与我们有些实际任务是不相符的。在通过线程或者executor执行Runnable任务中,不仅仅是不能返回任务的执行结果,有时我们希望可以控制某个任务,或取消或终止,但在executor中一旦提交任务,我们将很难单一的控制任务的生命周期,虽然ExecutorService扩展了Executor接口,添加了生命周期的控制,但是基于线程池的,针对的是所有任务,是无法单一的控制某个任务的。

JDK还提供了另外一种更佳的任务抽象Callable,它和Runnable十分的相似,但也存在一些差异。Callable任务可以返回一个执行结果,但我们向executor提交一个Callable任务时,就会得到一个Future对象,这就像这个Callable提交到executor之后的一个发票回执,利用这个回执,在以后我们能获取任务的执行结果,或者当我们想取消该任务时,也可以利用这个任务提交时得到的Future对象去取消这个任务,而且利用这个Future我们还能在提交后的任意时间得到任务的状态(是否被取消,是否完成)。

Callable和Runnable的异同点:

  • Callable定义的方法是call( ),而Runnable定义的方法是run( )。
  • Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
  • Callable的call方法可抛出受检查的异常,而Runnable的run方法不能抛出异常。

在工具类Executors中有一些工具方法可以把Runnable任务转成Callable。你可以使用executor去执行一个Callable任务,也可以将Callable转成FutureTask对象,然后交由线程去执行。

Future是异步计算的结果,它描述了任务的生命周期,并提供了相关的方法来获得任务执行的结果、取消任务以及检查任务是否已经完成或者取消。

有多种方式可以创建一个Future。ExecutorService中的所有submit方法都会返回一个Future,利用这个返回的Future你可以获取任务的执行结果,或者取消任务。可以显示将Runnable或者Callable实例化一个FutureTask。

下面的例子演示了Callable和Future的一些方法,程序中定义了两个任务c1和c2,并且模拟c2的执行时间是8秒左右,然后依次调用future的相关方法

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFuture {

	public static void main(String[] args) {

		ExecutorService es = Executors.newFixedThreadPool(5);

		Callable<Integer> c1 = new Target(false);
		Callable<Integer> c2 = new Target(true);

		Future<Integer> f1 = es.submit(c1);
		Future<Integer> f2 = es.submit(c2);

		int res = 0;
		try {
			res = f1.get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		boolean isCancelled = f1.isCancelled();
		boolean isDone = f1.isDone();

		System.out.println(res);
		System.out.println(isCancelled);
		System.out.println(isDone);

		System.out.println("---------------------------");

		try {
			boolean cancel = f2.cancel(true);
			int res2 = f2.get();
			isCancelled = f1.isCancelled();
			isDone = f1.isDone();

			System.out.println(res2);
			System.out.println(cancel);
			System.out.println(isCancelled);
			System.out.println(isDone);
		} catch (CancellationException e) {
			// TODO Auto-generated catch block
			System.out.println("任务被取消.");
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("任务被中断.");
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			System.out.println("任务执行异常.");
		}

	}

}

class Target implements Callable<Integer> {

	private boolean sleep = false;

	public Target(boolean sleep) {
		// TODO Auto-generated constructor stub
		this.sleep = sleep;
	}

	@Override
	public Integer call() throws Exception {
		// TODO Auto-generated method stub
		if(sleep) {
			Thread.sleep(8000);
		}
		int i = new Random().nextInt(1000);
		return i;
	}

}

任务的执行结果:

982
false
true
---------------------------
任务被取消.

Future接口的相关方法

cancel( )方法可以试图取消任务的执行,如果当前任务已经完成、或已经被取消、或由于某些原因无法取消,则取消操作失败,返回false;如果该任务尚未运行,调用cancel( )方法将会使该任务永不会运行;如果调用cancel( )方法时,该任务已经运行,那么取决于参数boolean的值,如果是true,则表示立即中断该任务的执行,否则,等待该运行的任务结束后,尝试cancel并返回false。

isCancel( ),如果在任务正常完成前将其取消,那么返回true,否则,返回false。

isDone( ) , 如果任务已完成,则返回true,由于正常终止、异常或取消而完成,也会返回true。

get( ) , 如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,get会阻塞直到它完成。如果任务抛出了异常,get会将该异常封装为ExecutionException,然后重新抛出;如果任务被取消,get会抛出CancellationException。

FutureTask

FutureTask类相当于同时实现了Runnable和Future接口,提供了Future接口的具体实现,可以使用FutureTask去包装Callable或Runnable任务。因为FutureTask实现了Runnable接口,所以可以将其交给Executor去执行,或者直接调用run( )去执行。

使用FutureTask的一个示例

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class MyFutureTask {

	public static void main(String[] args) throws Exception {

		Executor executor = Executors.newFixedThreadPool(5);

		Callable<Integer> callable = new MyTarget();
		FutureTask<Integer> ft = new FutureTask<>(callable);

		executor.execute(ft);
		System.out.println(ft.get());

//		直接调用run
//		ft.run();
//		System.out.println(ft.get());

		System.out.println("-----------------------");

		Runnable runnable = new MyRunnableTarget();
		FutureTask<String> ft2 = new FutureTask<String>(runnable, "SUCCESS");
		executor.execute(ft2);
		System.out.println(ft2.get());

	}

}

class MyTarget implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		// TODO Auto-generated method stub
		int i = new Random().nextInt(1000);
		return i;
	}

}

class MyRunnableTarget implements Runnable {

	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("Runnable is invoke...");
	}

}

程序输出:

280
-----------------------
Runnable is invoke...
SUCCESS

CompletionService

有时候我们需要利用executor去执行一批任务,每个任务都有一个返回值,利用Future就可以解决这个问题,为此我们需要保存每个任务提交后的Future,然后依次调用get方法轮询,获得已经执行完毕的任务的结果,这样的过程显得无趣。我们希望一次提交一批任务后,executor执行结束也是返回给我们一个已经执行完毕的Future集合。

CompletionService整合了Executor和BlockingQueue的功能。你可以将一批Callable任务交给它去执行,然后使用类似于队列中的take和poll方法,在结果完成时获得这个结果,就像一个打包的Future。将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit方法 执行的任务。使用者
take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。ExecutorCompletionService类是一个实现了CompletionService接口的实现类,它将计算任务交给一个传入的Executor去执行。

下面是一个ExecutorCompletionService的示例

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;

public class TestCompletionService {

	private class Target implements Callable<Integer> {

		@Override
		public Integer call() throws Exception {
			// TODO Auto-generated method stub
			int i = new Random().nextInt(1000);
			return i;
		}

	}

	public static void main(String[] args) throws Exception {
		Executor executor = Executors.newFixedThreadPool(5);
		ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(executor);

		Callable<Integer> c1 = new TestCompletionService().new Target();
		Callable<Integer> c2 = new TestCompletionService().new Target();
		Callable<Integer> c3 = new TestCompletionService().new Target();

		ecs.submit(c1);
		ecs.submit(c2);
		ecs.submit(c3);

		System.out.println(ecs.take().get());
		System.out.println(ecs.poll().get());
		System.out.println(ecs.take().get());

	}

}

这样将Future分离开来,已经完成的任务的Future就会被加入到BlockingQueue中供用户直接获取。

关于poll方法和get方法的区别,poll方法是非阻塞的,有则返回,无则返回NULL,take方法是阻塞的,没有的话则会等待。

批处理与任务执行时限

在有些应用场景中,我们需要同时处理多个任务,并获取结果,使用上面的CompletionService将完成的任务与未完成的任务分隔开似乎能够解决,但是如果其中有一个任务相当耗时,就会影响整个批处理任务的完成速度。比如,在一个页面中,我们需要从多个数据源获取数据,并在页面展示,同时我们希望整个页面的加载过程不超过2秒,那么那些超过2秒没有响应成功的数据源数据则用默认值替换,ExecutorService提供了invokeAll( )来完成这个任务。

下面我们通过一个示例演示invokeAll方法,程序中定义了3个任务,c1、c2、c3模拟执行时间分别为1、2、3秒,程序允许的最大执行时间是2秒,超过2秒的任务就会被取消。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class TestCompletionService {

	private class Target implements Callable<Integer> {

		private int a = 0;

		public Target(int a) {
			// TODO Auto-generated constructor stub
			this.a = a;
		}

		@Override
		public Integer call() throws Exception {
			// TODO Auto-generated method stub
			Thread.sleep(1000*a);
			return a;
		}

	}

	public static void main(String[] args) {
		ExecutorService es = Executors.newFixedThreadPool(5);

		Callable<Integer> c1 = new TestCompletionService().new Target(1);
		Callable<Integer> c2 = new TestCompletionService().new Target(2);
		Callable<Integer> c3 = new TestCompletionService().new Target(3);

		List<Callable<Integer>> list = new ArrayList<>();
		list.add(c1);
		list.add(c2);
		list.add(c3);

		try {
			List<Future<Integer>> res = es.invokeAll(list, 2, TimeUnit.SECONDS);

			Iterator<Future<Integer>> it = res.iterator();
			while(it.hasNext()) {
				Future<Integer> f = it.next();
				int i = f.get();
				System.out.println(i);
			}
		} catch (CancellationException e ) {
			System.out.println("任务取消");
		} catch (InterruptedException e) {
			System.out.println("中断异常");
		} catch (ExecutionException e) {
			System.out.println("执行异常");
		}

	}

}

程序的输出:

1
2
任务取消

需要注意的是,java.util.concurrent中所有的关于时间的方法都将负数作为0处理,不需要额外的处理

时间: 2024-08-29 06:14:03

Java并发——Callable和Future的相关文章

java并发编程之future模式

1.当你想并发去执行一段代码,但是还想获取这段代码的返回结果,那么future多线程模式就可以派上用场了,代码实现如下. public class Client { public Data request() { final FutureData futureData = new FutureData(); new Thread(new Runnable() { @Override public void run() { futureData.setRealData(new RealData()

Java多线程 - Callable和Future

已知的创建多线程的方法有继承Tread类和实现Runnable方法.此外Java还提供了Callable接口,Callable接口也提供了一个call()方法来做为线程执行体.但是call()方法与run()方法有些不同: call()方法可以有返回值 call()方法可以抛出异常 不过Java不允许Callable对象直接作为Thread的target.而且call()方法还有一个返回值--call()方法并不是直接调用,他是做为线程执行体被调用的.Java提供了Future接口来代表Call

Java多线程——Callable与Future

二月份回家过年了,家里没网,所以博客也停了一段时间,上班已经一周了,总的来说还是比较忙吧!周末还是把没总结完的知识点总结一下,方便日后翻阅! Callable与Future Runnable封装一个异步运行的任务,可以把它想象成一个没有参数和返回值的异步方法.Callable与Runnable类似,但是有返回值.Callable接口是一个参数化的类型,只有一个方法call. public interface Callable<V>{ V call() throws Exception; } 类

Java多线程Callable和Future类详解

     public interface Callable<V>    返回结果并且可能抛出异常的任务.实现者定义了一个不带任何参数的叫做 call 的方法      public interface Future<V>      Future 表示异步计算的结果.计算完成后只能使用 get 方法来获取结果 1.线程处理返回结果 一般开发中,使用多线程,最常见的就是:1.实现Runnable接口:2.继承Thread类. 但是run方法是没有返回结果,很难满足我们的需求.这时,常

Java利用Callable、Future进行并行计算求和

内容:在Java中利用Callable进行带返回结果的线程计算,利用Future表示异步计算的结果,分别计算不同范围的Long求和,类似的思想还能够借鉴到需要大量计算的地方. public class Sums { public static class Sum implements Callable<Long> { private final Long from; private final Long to; public Sum(long from, long to) { this.fro

java 使用Callable和Future返回线程执行的结果

我们可能在某些特殊的需求下需要返回一个线程执行的结果,在java 1.5提供了Callable和Futrue就能帮你实现!不解释看代码 package cn.kge.com; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Exe

Java并发编程:Callable、Future和FutureTask(转)

Java并发编程:Callable.Future和FutureTask 在前面的文章中我们讲述了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 今天我们就来讨论一下Callabl

Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable

一.Exectuor框架简介 Java从1.5版本开始,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架). Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等.他们的关系为 在Executor框架中,使用执行器(Exectuo

java并发编程--Runnable Callable及Future

1.Runnable Runnable是个接口,使用很简单: 1. 实现该接口并重写run方法 2. 利用该类的对象创建线程 3. 线程启动时就会自动调用该对象的run方法 通常在开发中结合ExecutorService使用,将任务的提交与任务的执行解耦开,同时也能更好地利用Executor提供的各种特性 ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(new Runnable() { pub