在java多线程编程中,我们经常使用线程池提交任务,并且通过Future来获取任务执行的结果,以此达到异步或者并行执行的效果。在jdk1.7以前,FutureTask是Future唯一的实现类,1.7后加入了ForkJoinTask类。本文主要总结一下我对FutureTask的理解。
Future类
Future接口定义了5个方法,分别是
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
分别介绍一下这五个接口的用途:
- boolean cancel(boolean mayInterruptInRunning) 取消一个正在执行中的任务,并且返回调用结果。如果取消成功则返回true,反之返回false。这里要注意,即使方法返回true,当前任务也未必真的被取消了,后面会介绍。
- boolean isCancelled() 返回当前任务是否被取消。
- Boolean isDone() 返回当前任务是否执行完毕。这里done的概念比较广,包括了futureTask被执行后的任意状态,例如正常执行完毕、执行异常或者任务被取消。
- V get() 这个接口就是用来获取futureTask执行结果,调用这个接口时会被阻塞,直到拿到结果或者异常。
- V get(long timeout, TimeUnit unit) 这个接口多了一个超时时间,如果过了这个时间task仍然没有结果返回,则抛出timeout异常
写个demo便于理解
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ExecutorService executorService = Executors.newCachedThreadPool(); 4 Future future = executorService.submit(new Callable<Object>() { 5 @Override 6 public Object call() throws Exception { 7 Long start = System.currentTimeMillis(); 8 while (true) { 9 Long current = System.currentTimeMillis(); 10 if ((current - start) > 1000) { 11 return 1; 12 } 13 } 14 } 15 }); 16 17 try { 18 Integer result = (Integer)future.get(); 19 System.out.println(result); 20 }catch (Exception e){ 21 e.printStackTrace(); 22 } 23 } 24 }
这里我们模拟了1s钟的CPU空转,当执行future.get()的时候,主线程阻塞了大约一秒后,把结果打印出来:1。
当然我们也可以使用V get(long timeout, TimeUnit unit),这个方法提供了一个超时时间的设置,如果超过当前时间任务线程还未返回,那么就会停止阻塞状态,并且抛出一个timeout异常。如下
1 try { 2 Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS); 3 System.out.println(result); 4 } catch (Exception e) { 5 e.printStackTrace(); 6 }
这里我们设置的超时时间是500毫秒,由于一开始我们模拟了1s的CPU计算时间,这里便会抛出超时异常,打印出堆栈信息
当然,如果我们把超时时间设置的长一些,还是可以得到预期的结果的。
FutureTask内部实现机制
刚我们测试了最常用的两个方法,接下来我们来探一探FutureTask的内部实现机制。首先我们看一下FutureTask的继承结构:
FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既可以当做一个Runnable,也可以当做一个Future。
FutureTask内部定义了7个状态,代表了FutureTask当前所处状态。如下
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
当一个任务刚提交的时候,状态为NEW,由FutureTask的构造器可知:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
任务执行正常结束前,state会被设置成COMPLETING,代表任务即将完成,接下来很快就会被设置为NARMAL或者EXCEPTIONAL,这取决于调用Runnable中的call()方法是否抛出了异常。如果没有异常,则state设为NARMAL,反之为EXCEPTIONAL。
如果任务提交后,在任务执行结束之前调用cancel(boolean mayInterruptIfRunning) 取消任务,那么有可能进入到后3个状态。如果传入的参数是false,state会被置为CANCELLED,反之如果传入true,state先被置为INTERRUPTING,后被置为INTERRUPTED。
总结下,FutureTask的状态流转过程,可以出现以下三种状态:
1. 正常执行完毕。 NEW -> COMPLETING -> NORMAL
2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL
3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED
4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED
那么以上状态为什么会这么流转呢?接下来我们一起扒一扒FutureTask的源码。我们从futureTask的方法看起。
1 public void run()
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result; 10 boolean ran; 11 try { 12 result = c.call(); 13 ran = true; 14 } catch (Throwable ex) { 15 result = null; 16 ran = false; 17 setException(ex); 18 } 19 if (ran) 20 set(result); 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
翻译一下,这个方法经历了以下几步
1. 校验Task状态和当前线程引用runner,如果state不为NEW或者runner引用为null,直接返回。
2. 调用runner的call()方法执行主逻辑,并且尝试获得返回值result。
3. 如果抛出异常,调用setException(Throwable t)方法
4. 如果没有异常,调用set(V v)方法
5. 一些扫尾工作
那么setException(Throwable t)和set(V v)做了什么呢?我们看一下源码
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
set(V v) 方法首先做一个CAS操作,将state字段由NEW->COMPLETING,这里的CAS操作读者可以自行百度原理。如果成功,那么把执行结果v赋给成员变量outcome,再把state的值设置为NORMAL,最后做一些清理工作,唤醒所有等待线程并把callable对象置为null。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
同理,setException(Throwable t) 方法大同小异,只不过state字段流转为NEW->COMPLETING->EXCEPTION。同时把异常对象赋予v。
这里我们就清楚了,当一个任务被提交后,状态流转中1、2是怎么来的了。同时我们可以确定,outcome变量,存着是执行结果或者抛出的异常对象。
2 public V get() throws InterruptionException,ExecutionException
get() 和 get(long timeout, TimeUnit unit)方法是获取执行结果的两个方法,我们这里就看get()方法即可。首先贴源码
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
首先检查state值,如果小于COMPLETING,则阻塞,阻塞时可能会抛出异常,这里我们不纠结这个,往下看。如果没有抛出异常,获取执行后返回的state值,最后调用report(s)方法。接着我们看report方法,如果s为NORMAL,返回执行结果outcome,否则抛出异常。结合之前的run()方法,我们这里可以得出,如果主逻辑正常执行完毕,则返回执行结果,如果抛出异常,那么这里会封装该异常为ExecutionException并且抛出。如果任务执行过程中被取消了,则可能抛出CancellationException()。
3 public boolean cancel(boolean mayInterruptIfRunning)
这个方法个人认为是最具争议的方法。这里我们先贴个demo
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4 // 预创建线程 5 executorService.prestartCoreThread(); 6 7 Future future = executorService.submit(new Callable<Object>() { 8 @Override 9 public Object call() { 10 System.out.println("start to run callable"); 11 Long start = System.currentTimeMillis(); 12 while (true) { 13 Long current = System.currentTimeMillis(); 14 if ((current - start) > 1000) { 15 System.out.println("当前任务执行已经超过1s"); 16 return 1; 17 } 18 } 19 } 20 }); 21 22 System.out.println(future.cancel(false)); 23 24 try { 25 Thread.currentThread().sleep(3000); 26 executorService.shutdown(); 27 } catch (Exception e) { 28 //NO OP 29 } 30 } 31 }
我们多次测试后发现,出现了2种打印结果,如图
结果1
结果2
咦,两个结果和预期的都好像不太一样?第一种是任务压根没取消,第二种则是任务压根没提交成功,似乎和方法签名cancel不太一致?
我们先看一下方法签名上的作者注释
/** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when <tt>cancel</tt> is called, * this task should never run. If the task has already started, * then the <tt>mayInterruptIfRunning</tt> parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return <tt>true</tt>. Subsequent calls to {@link #isCancelled} * will always return <tt>true</tt> if this method returned <tt>true</tt>. * * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return <tt>false</tt> if the task could not be cancelled, * typically because it has already completed normally; * <tt>true</tt> otherwise */ 这里我们可以看到,"尝试"取消任务的执行,如果当前任务已经结束或者已经取消,则当前取消操作会失败。如果任务还没开始就被取消,那么任务则不会被执行。这里我们就知道了,如果任务还没开始执行时cancel(false)就被调用,那么这个任务是不会被执行的,这就解释了出现上图结果2的情况。那如果任务已经开始执行,并且调用cancel(false),是不会终止任务的。我们还是从源码去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
执行逻辑如下
1. 如果当前futureTask状态不为NEW,直接返回false,表示取消操作失败。
2. 如果传入true,代表可能会引发线程中断。一个CAS操作,把状态由NEW->INTERRUPTING,如果执行失败则直接返回false。设置当前工作线程中断标识为true,然后把futureTask状态设置为INTERRUPTED。
3. 如果传入false,把futureTask状态设置为CANCELLED。
4. 做一些清理工作
可见,cancel()方法仅仅是改变了futureTask的状态位!如果传入的是false,当前任务是不会被终止的,而是会继续执行,直到异常或者执行完毕。如果传入的是true,会调用当前线程的interrupt()方法,把中断标志位设为true。所以cancel()方法其实个人理解是有歧义的,它并不能真正取消一个任务的执行。事实上,除非线程自己停止自己的任务,或者退出JVM,是没有其他方法完全终止一个线程的任务的。cancel(true)方法也只是希望当前线程可以响应中断而已,当线程被阻塞,抛出InterruptedException。同时,由之前的future.get()方法可知,如果一个futureTask被cancel()了,调用get()方法会抛出CancellationException。
总结
理解FutureTask,我们使用Future类才能更加得心应手。这里也只是作者自己的理解,如有不对之处,还望读者批评指正。
作者:mayday芋头
出处:http://www.cnblogs.com/maypattis/
本博客中未标明转载的文章归作者mayday芋头和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利