揭密FutureTask

在java多线程编程中,我们经常使用线程池提交任务,并且通过Future来获取任务执行的结果,以此达到异步或者并行执行的效果。在jdk1.7以前,FutureTask是Future唯一的实现类,1.7后加入了ForkJoinTask类。本文主要总结一下我对FutureTask的理解。

Future类

  Future接口定义了5个方法,分别是 

 boolean cancel(boolean mayInterruptIfRunning);
 boolean isCancelled();
 boolean isDone();
 V get() throws InterruptedException, ExecutionException;
 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

分别介绍一下这五个接口的用途:

  • boolean cancel(boolean mayInterruptInRunning) 取消一个正在执行中的任务,并且返回调用结果。如果取消成功则返回true,反之返回false。这里要注意,即使方法返回true,当前任务也未必真的被取消了,后面会介绍。
  • boolean isCancelled() 返回当前任务是否被取消。
  • Boolean isDone() 返回当前任务是否执行完毕。这里done的概念比较广,包括了futureTask被执行后的任意状态,例如正常执行完毕、执行异常或者任务被取消。
  • V get() 这个接口就是用来获取futureTask执行结果,调用这个接口时会被阻塞,直到拿到结果或者异常。
  • V get(long timeout, TimeUnit unit) 这个接口多了一个超时时间,如果过了这个时间task仍然没有结果返回,则抛出timeout异常

写个demo便于理解

 1 public class FutureDemo {
 2     public static void main(String[] args) {
 3         ExecutorService executorService = Executors.newCachedThreadPool();
 4         Future future = executorService.submit(new Callable<Object>() {
 5             @Override
 6             public Object call() throws Exception {
 7                 Long start = System.currentTimeMillis();
 8                 while (true) {
 9                     Long current = System.currentTimeMillis();
10                     if ((current - start) > 1000) {
11                         return 1;
12                     }
13                 }
14             }
15         });
16
17         try {
18             Integer result = (Integer)future.get();
19             System.out.println(result);
20         }catch (Exception e){
21             e.printStackTrace();
22         }
23     }
24 }

   这里我们模拟了1s钟的CPU空转,当执行future.get()的时候,主线程阻塞了大约一秒后,把结果打印出来:1。

当然我们也可以使用V get(long timeout, TimeUnit unit),这个方法提供了一个超时时间的设置,如果超过当前时间任务线程还未返回,那么就会停止阻塞状态,并且抛出一个timeout异常。如下

1         try {
2             Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);
3             System.out.println(result);
4         } catch (Exception e) {
5             e.printStackTrace();
6         }

这里我们设置的超时时间是500毫秒,由于一开始我们模拟了1s的CPU计算时间,这里便会抛出超时异常,打印出堆栈信息

当然,如果我们把超时时间设置的长一些,还是可以得到预期的结果的。

FutureTask内部实现机制

  刚我们测试了最常用的两个方法,接下来我们来探一探FutureTask的内部实现机制。首先我们看一下FutureTask的继承结构:

          

FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既可以当做一个Runnable,也可以当做一个Future。

  FutureTask内部定义了7个状态,代表了FutureTask当前所处状态。如下

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

当一个任务刚提交的时候,状态为NEW,由FutureTask的构造器可知:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

  任务执行正常结束前,state会被设置成COMPLETING,代表任务即将完成,接下来很快就会被设置为NARMAL或者EXCEPTIONAL,这取决于调用Runnable中的call()方法是否抛出了异常。如果没有异常,则state设为NARMAL,反之为EXCEPTIONAL。

  如果任务提交后,在任务执行结束之前调用cancel(boolean mayInterruptIfRunning) 取消任务,那么有可能进入到后3个状态。如果传入的参数是false,state会被置为CANCELLED,反之如果传入true,state先被置为INTERRUPTING,后被置为INTERRUPTED。

总结下,FutureTask的状态流转过程,可以出现以下三种状态:

1. 正常执行完毕。 NEW -> COMPLETING -> NORMAL

    2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL

3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED

   4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED  

  那么以上状态为什么会这么流转呢?接下来我们一起扒一扒FutureTask的源码。我们从futureTask的方法看起。

1 public void run()

 1 public void run() {
 2         if (state != NEW ||
 3             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                          null, Thread.currentThread()))
 5             return;
 6         try {
 7             Callable<V> c = callable;
 8             if (c != null && state == NEW) {
 9                 V result;
10                 boolean ran;
11                 try {
12                     result = c.call();
13                     ran = true;
14                 } catch (Throwable ex) {
15                     result = null;
16                     ran = false;
17                     setException(ex);
18                 }
19                 if (ran)
20                     set(result);
21             }
22         } finally {
23             // runner must be non-null until state is settled to
24             // prevent concurrent calls to run()
25             runner = null;
26             // state must be re-read after nulling runner to prevent
27             // leaked interrupts
28             int s = state;
29             if (s >= INTERRUPTING)
30                 handlePossibleCancellationInterrupt(s);
31         }
32     }

  翻译一下,这个方法经历了以下几步

1. 校验Task状态和当前线程引用runner,如果state不为NEW或者runner引用为null,直接返回。

  2. 调用runner的call()方法执行主逻辑,并且尝试获得返回值result。

  3. 如果抛出异常,调用setException(Throwable t)方法

  4. 如果没有异常,调用set(V v)方法

  5. 一些扫尾工作

那么setException(Throwable t)和set(V v)做了什么呢?我们看一下源码

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

  set(V v) 方法首先做一个CAS操作,将state字段由NEW->COMPLETING,这里的CAS操作读者可以自行百度原理。如果成功,那么把执行结果v赋给成员变量outcome,再把state的值设置为NORMAL,最后做一些清理工作,唤醒所有等待线程并把callable对象置为null。

 protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

  同理,setException(Throwable t) 方法大同小异,只不过state字段流转为NEW->COMPLETING->EXCEPTION。同时把异常对象赋予v。

  这里我们就清楚了,当一个任务被提交后,状态流转中1、2是怎么来的了。同时我们可以确定,outcome变量,存着是执行结果或者抛出的异常对象。

2  public V get() throws InterruptionException,ExecutionException

get() 和 get(long timeout, TimeUnit unit)方法是获取执行结果的两个方法,我们这里就看get()方法即可。首先贴源码

  

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

  首先检查state值,如果小于COMPLETING,则阻塞,阻塞时可能会抛出异常,这里我们不纠结这个,往下看。如果没有抛出异常,获取执行后返回的state值,最后调用report(s)方法。接着我们看report方法,如果s为NORMAL,返回执行结果outcome,否则抛出异常。结合之前的run()方法,我们这里可以得出,如果主逻辑正常执行完毕,则返回执行结果,如果抛出异常,那么这里会封装该异常为ExecutionException并且抛出。如果任务执行过程中被取消了,则可能抛出CancellationException()。

3 public boolean cancel(boolean mayInterruptIfRunning)

  这个方法个人认为是最具争议的方法。这里我们先贴个demo

  

 1 public class FutureDemo {
 2     public static void main(String[] args) {
 3         ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
 4         // 预创建线程
 5         executorService.prestartCoreThread();
 6
 7         Future future = executorService.submit(new Callable<Object>() {
 8             @Override
 9             public Object call() {
10                 System.out.println("start to run callable");
11                 Long start = System.currentTimeMillis();
12                 while (true) {
13                     Long current = System.currentTimeMillis();
14                     if ((current - start) > 1000) {
15                         System.out.println("当前任务执行已经超过1s");
16                         return 1;
17                     }
18                 }
19             }
20         });
21
22         System.out.println(future.cancel(false));
23
24         try {
25             Thread.currentThread().sleep(3000);
26             executorService.shutdown();
27         } catch (Exception e) {
28             //NO OP
29         }
30     }
31 }

我们多次测试后发现,出现了2种打印结果,如图

                结果1

                结果2    

咦,两个结果和预期的都好像不太一样?第一种是任务压根没取消,第二种则是任务压根没提交成功,似乎和方法签名cancel不太一致?

我们先看一下方法签名上的作者注释

/** * Attempts to cancel execution of this task.  This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when <tt>cancel</tt> is called, * this task should never run.  If the task has already started, * then the <tt>mayInterruptIfRunning</tt> parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled} * will always return <tt>true</tt> if this method returned <tt>true</tt>. * * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return <tt>false</tt> if the task could not be cancelled, * typically because it has already completed normally; * <tt>true</tt> otherwise */   这里我们可以看到,"尝试"取消任务的执行,如果当前任务已经结束或者已经取消,则当前取消操作会失败。如果任务还没开始就被取消,那么任务则不会被执行。这里我们就知道了,如果任务还没开始执行时cancel(false)就被调用,那么这个任务是不会被执行的,这就解释了出现上图结果2的情况。那如果任务已经开始执行,并且调用cancel(false),是不会终止任务的。我们还是从源码去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW)
            return false;
        if (mayInterruptIfRunning) {
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
    }

  执行逻辑如下

1. 如果当前futureTask状态不为NEW,直接返回false,表示取消操作失败。

  2. 如果传入true,代表可能会引发线程中断。一个CAS操作,把状态由NEW->INTERRUPTING,如果执行失败则直接返回false。设置当前工作线程中断标识为true,然后把futureTask状态设置为INTERRUPTED。

  3. 如果传入false,把futureTask状态设置为CANCELLED。

  4. 做一些清理工作

可见,cancel()方法仅仅是改变了futureTask的状态位!如果传入的是false,当前任务是不会被终止的,而是会继续执行,直到异常或者执行完毕。如果传入的是true,会调用当前线程的interrupt()方法,把中断标志位设为true。所以cancel()方法其实个人理解是有歧义的,它并不能真正取消一个任务的执行。事实上,除非线程自己停止自己的任务,或者退出JVM,是没有其他方法完全终止一个线程的任务的。cancel(true)方法也只是希望当前线程可以响应中断而已,当线程被阻塞,抛出InterruptedException。同时,由之前的future.get()方法可知,如果一个futureTask被cancel()了,调用get()方法会抛出CancellationException。

总结

  理解FutureTask,我们使用Future类才能更加得心应手。这里也只是作者自己的理解,如有不对之处,还望读者批评指正。

作者:mayday芋头

出处:http://www.cnblogs.com/maypattis/

本博客中未标明转载的文章归作者mayday芋头和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利

  

 
时间: 2024-08-05 05:21:15

揭密FutureTask的相关文章

深入学习 FutureTask

原文出处: 天凉好个秋 第一部分:What 在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就可以在任务执行完毕之后得到任务的执行结果.本文会简要的介绍使用方法,然后会从源代码角度分析下具体的实现原理.本文以Java 1.7的代码进行分析. 第二部分:How Callable接口 对于需要执行的任务需要实现Callabl

微信运动记步原理,及刷步数方法大揭密

随着"微信运动"功能的开启,朋友圈里燃起了每天刷步的热情,每天为了争夺运动封面第一名,不惜下班还在楼下多绕一圈儿再回家.那么你们都知道微信运动是如何记录我们每日行程步数的吗?这个数据到底准不准确呢?今天小编就为大家来一个大揭密. 原理: 手机内置了振动传感器或协助处理器,根据这些设备的震动频率来储存数据.简单地说,就是人在步行时重心都有一点上下移动,传感器和协作器感应到这种重心移动并进行记数. 设备: iPhone 5S/6/6 Plus三款苹果手机因为自带M7/M8协处理器,无需任何

Java并发编程:Callable、Future和FutureTask(转)

Java并发编程:Callable.Future和FutureTask 在前面的文章中我们讲述了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 今天我们就来讨论一下Callabl

V5天下-安德揭密CCIEv5认证考试 就业和前途

在笔者看来,RSCCIEv5,这都不是事儿!您看Yeslab(上海)学员--来自思科的妹子一举拿下CCIEv5.0认证!!又一位double 女IE诞生 那么什么是事儿? 如何学习是事儿!这是我们提到的第一件事 笔者从最初的沁园公寓701室(Yeslab最原始的老巢),到沁园公寓907室(Yeslab的第一步扩张),之后搬到苏州长远天地1005室,又扩展到长远天地601室(Yeslab安全和DC的基地),再之后到,安德作为小弟和几位同僚奔赴上海这个国际大都市,我们的小圈子从华鼎大厦16D开始,而

Callable、Future和FutureTask浅析

我们知道创建线程的方式有两种,一种是实现Runnable接口,另一种是继承Thread,但是这两种方式都有个缺点,那就是在任务执行完成之后无法获取返回结果,那如果我们想要获取返回结果该如何实现呢?还记上一篇Executor框架结构中提到的Callable接口和Future接口吗?,是的,从JAVA SE 5.0开始引入了Callable和Future,通过它们构建的线程,在任务执行完成后就可以获取执行结果,今天我们就来聊聊线程创建的第三种方式,那就是实现Callable接口. 1.Callabl

十亿级视频播放技术优化揭密

本文为转载文章,文章来自:王辉|十亿级视频播放技术优化揭密 QCon是由InfoQ主办的全球顶级技术盛会,每年在伦敦.北京.东京.纽约.圣保罗.上海.旧金山召开.自 2007年 3月份首次举办以来,已经有超万名高级技术人员参加过QCon大会.QCon内容源于实践并面向社区,演讲嘉宾依据热点话题,面向 5年以上工作经验的技术团队负责人.架构师.工程总监.高级开发人员分享技术创新和最佳实践. 4月18日性能优化面面观专题会议上,腾讯研发总监王辉以“十亿级视频播放技术优化揭秘”为主题,用QQ空间的日均

Callable, Runnable, Future, FutureTask

Java并发编程之Callable, Runnable, Future, FutureTask Java中存在Callable, Runnable, Future, FutureTask这几个与线程相关的类或接口, 下面来了解一下它们的作用和区别. 一.Callable和Runnable Callable和Runnable类似, 实现Callable和Runnable接口的类都是可以被其他线程运行的任务, Callable和Runnable主要有以下几点区别: (1). Callable中声明的

FutureTask源码解读

import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestFuture { public static void

Callable、Future和FutureTask

Runnable是执行工作的独立任务,但是它不返回任何值,如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦.自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 一.Runnable java.lang.Runnable是一个接口,在它里面只声明了一个run()方法: 由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果. 二.Callable Callable位