J.U.C FutureTask之源码解析

通过直接继承Thread, 实现Runnable接口来创建线程。但这两种方式都有一种缺陷:在执行完任务之后无法获得执行结果。

如果需要获得执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来比较麻烦,而jdk中Callable和Future,通过他们可以在任务执行完毕之后得到任务执行结果。先看看他们之间的组织关系:

Callable:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

源码可知,它也是个一个接口,在他里面也只是申明一个方法,只不过这个方法为call(),call方法返回的就是该泛型传递进来的V类型,他怎么使用呢?就是结合之前的ExecuteService:

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

第一个submit方法里面的参数类型就是Callable。

Future:

Future就是对于具体的Runnable或者Callable任务的执行进度的查看,取消,查询是否完成,获取结果(正确完成时的结果,或异常)。必要时可以通过get方法获取执行的结果,该方法会阻塞直到任务返回结果,或通过指定阻塞时间的版本。

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

其中cancel()方法用来取消任务,如果取消任务成功则返回true, 如果取消任务失败则返回false。 参数mayInterruptIfRunning表示是否允许取消真在执行去没有执行完毕的任务,如果设置true, 则表示可以取消正在执行过程的任务。 当任务已经完成,或者已经被取消过了,或者因为别的原因不能取消, 则返回false。 当取消时,该任务还没有开始执行,则该任务不会执行,并且总是返回true。

FutureTask:

public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,看一下RunnableFuture接口的定义:

public interface RunnableFuture<V> extends Runnable, Future<V>

RunnableFuture接口接触了Runnable接口和Future接口, 而FutureTask实现了RunnableFuture接口,所以它既可作为Runnable被线程执行,也可以作为Future得到Callable的返回值。

构造器定义:

 public FutureTask(Callable<V> callable)
 public FutureTask(Runnable runnable, V result) {

再来看看第二个构造器中的参数怎么变身Callable的:

this.callable = Executors.callable(runnable, result);

调用Executors.callable方法:

 public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

简单实现Callable:

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

流程:

下面结合完整具体流程走一下FutureTask过程,并解析源码,草图如下:

实例代码如下:

 1 public class Test {
 2     public static void main(String[] args) {
 3         //第一种方式
 4         ExecutorService executor = Executors.newCachedThreadPool();
 5         Task task = new Task();
 6         FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
 7         executor.submit(futureTask);
 8         executor.shutdown();
15
16         try {
17             Thread.sleep(1000);
18         } catch (InterruptedException e1) {
19             e1.printStackTrace();
20         }
21
22         System.out.println("主线程在执行任务");
23
24         try {
25             System.out.println("task运行结果"+futureTask.get());
26         } catch (InterruptedException e) {
27             e.printStackTrace();
28         } catch (ExecutionException e) {
29             e.printStackTrace();
30         }
31
32         System.out.println("所有任务执行完毕");
33     }
34 }
35 class Task implements Callable<Integer>{
36     @Override
37     public Integer call() throws Exception {
38         System.out.println("子线程在进行计算");
39         Thread.sleep(3000);
40         int sum = 0;
41         for(int i=0;i<100;i++)
42             sum += i;
43         return sum;
44     }
45 }

分析过程之前,先准备前准备知识,首先看一下FutureTask内部状态,以及之间的转变:

    private volatile int state; // volatile 内存可见性
    private static final int NEW          = 0; //该状态为new FutureTask()时设定,同时也表示内部成员callable已经成功赋值,一直到worker thread完成FutureTask中run().
    private static final int COMPLETING   = 1; //该状态位worker thread完成task时设定的中间状态,处于该状态下,说明worker thread 真正准备设置result.
    private static final int NORMAL       = 2;  //当设置result结果完成后,FutureTask处于该状态,代表过程结果,该状态为最终状态final state,(正确完成的最终状态)
    private static final int EXCEPTIONAL  = 3;  // 同上,只不过task执行过程出现异常,此时结果设值为exception,也是final state
    private static final int CANCELLED    = 4;  //final state, 表明task被cancel(task还没有执行就被cancel的状态).
    private static final int INTERRUPTING = 5;  // 中间状态,task运行过程中被interrupt时,设置的中间状态;
    private static final int INTERRUPTED  = 6;   // final state, 中断完毕的最终状态,几种情况,下面具体分析。

下面是状态之间的转变,贯穿主线:

   * Possible state transitions:
     1* NEW -> COMPLETING -> NORMAL
     2* NEW -> COMPLETING -> EXCEPTIONAL
     3* NEW -> CANCELLED
     4* NEW -> INTERRUPTING -> INTERRUPTED
     */

其他重要的变量:

 /** The underlying callable; nulled out after running */
    private Callable<V> callable;   // 具体run运行时会调用其方法call(),并获得结果,结果时置为null.
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes   没必要为votaile,因为其是伴随state 进行读写,而state是FutureTask的主导因素。
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;   //具体的worker thread.
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;     //Treiber stack 并发stack数据结构,用于存放阻塞在该futuretask#get方法的线程。

OK,构造new FutureTask开始:

  public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable; //底层callable赋值
        this.state = NEW;       // 初始状态NEW,同时也标志了callable的赋值,可见性
    }

ThreadPoolExecutor.submit(Runnable),ThreadPoolExecutor里面具体细节请见这里,这里就假设它直接new a thread来处理该任务了,因为FutureTask为Runnable的子类,所以worker thread调用该类的run()方法:

        public void run() {        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))   //状态检测,和当前worker Thread的cas原子赋值,有一个不成立,就直接返回。什么情况下还没run()呢?就不是NEW状态了呢?
            return;                                                       //caller调用cancel了,此时状态为Interrupting,也说明了上面的cancel方法说明,task没运行时,就interrupt,task得不到运行。总是返回
        try {                                                               //true。                                                                           //再来看看这里worker thread赋值为什么要用cas操作,有竞争racing? 竞争哪里来?难道threadPoolExecutor线程池多个线程可能抢同一个
            Callable<V> c = callable;                                      //任务?不可能 1:线程数 < coreThreadPool 时, 直接new thread, 2 : 大于 coreThreadpool时,放在blockingqueue里,取的话只能一
            if (c != null && state == NEW) {                               //线程。能想到就是caller那边了,即多callers(多线程)提交同一FutureTask.
                V result;                                                  //多线程同时提交同一FutureTask,确保该FutureTask的run()只被调用一次,
                boolean ran;
                try {
                    result = c.call();                                      //此处的if,1:当state == NEW(task没完成,中断) 并且 worker Thread为null时,才会得到运行
                    ran = true;                                             // 2: task已经完成了 或者 该任务已经有worker thread来执行时,直接返回不会运行。
                } catch (Throwable ex) {                                //调用callable的call方法
                    result = null;                                     //执行task时有异常
                    ran = false;                                        //附异常
                    setException(ex);
                }
                if (ran)                                               //正常完成,则赋值
                    set(result);
            }
        } finally {
                                                                       //注意!!什么这里吧runner置为null,此时run()方法还没运行完呢啊!现在置为null,不怕并发调用run()吗?注意此时state已经变化了(Comple
            runner = null;                                             //teing或者interrupting了,run()一开始state != NEW 直接return,不会运行。可以说通过state和 worker thread来一起控制并发调用run
            int s = state;                                            //必须再读一次,防止worker thread == null后,遗漏的interrup信号,底下具体分析中断的情况。
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);              //如果caller中断信号有的话,则处理该interrupt.
        }                                                            //另外该任务是一致性任务,即state只要不为NEW,该任务就不会在运行,运行结束或cancel后,就不能在运行了,因为state状态在那不变哦!
    }

请看下例子,三个提交线程(提交同一个FutureTask):

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        CountDownLatch latch = new CountDownLatch(1);
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        for (int i = 0 ; i < 3; i++) {
            new Thread(new Submit(executor,  futureTask, latch)).start();
        }
        try {
            Thread.sleep(3000);
            latch.countDown();
            Thread.sleep(20000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("所有任务执行完毕");
        executor.shutdown();
    }

}

class Submit implements Runnable {
    private CountDownLatch latch ;
    private ExecutorService es ;
    private FutureTask<Integer> task;
    public Submit(ExecutorService es, FutureTask<Integer> task, CountDownLatch latch) {
         this.latch = latch;
         this.es = es;
         this.task = task;
    }
    public void run() {

        try {
            latch.await();
            Future<?> future = (Future<?>) es.submit(task);
            System.out.println("Thread name : " + Thread.currentThread().getName() + "go!");
            future.get(3000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
        } catch (TimeoutException e2) {
            System.err.println("Thread name : " + Thread.currentThread().getName()  + " " + e2);
        }
    }
}

class Task implements Callable<Integer>{

    public Integer call() throws Exception {
        System.out.println("thread name : " + Thread.currentThread().getName() + "do the work!");
        Thread.sleep(6000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

显示如下:

Thread name : Thread-1go!
Thread name : Thread-0go!
Thread name : Thread-2go!
thread name : pool-1-thread-1do the work!
Thread name : Thread-1 java.util.concurrent.TimeoutException
所有任务执行完毕

结果很显然,同一个任务多次提交(并发提交),FutureTask保证只是启一个线程来运行。

想运行多次(只要不cancel,和throw exception,因为他set(result),正常运行结束,state还是new),用这个:

protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don‘t set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {

            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

再来看看setException()和set(result):

 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // cas原子操作,失败直接返回,成功的前提之前的状态必须为NEW.
            outcome = v;                                                    //可能和什么冲突呢? caller已经cancel该task,状态位Interrupting或者Interrpted(这次Interrupted代表interrupt完成,这set()
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  // 不是在worker thread中调用的嘛,怎么intterupt都完成了,怎么worker thread还在运行呢?worker thread运行的代码中没有响
            finishCompletion();                                              //应interrupt的代码。所以客户端cancel操作,对运行中的worker thread,并不一定让它停下来,不过此时即使运行完毕,也不能赋值。
        }
    }                                                                        //new -> Completing-> NORMAL 或者NEW ->Interrupting->Intterrpted.
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;    //同上,不过附异常。
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }                                                                   //new ->completing ->exception 或者 同上
    }

finishCompletion()等会细聊,主要是没说到get()阻塞呢!看看caller端线程调用cancel()和workerThread的handlePossibleCancellationInterrupt(int s)协调:

   public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW)
            return false; //1:已经cancel(cancelled,Interruping, Interrupted)过了 2:正常完成 Completing(Completed) 3:异常完成completing(exception) 直接返回false;
        if (mayInterruptIfRunning) { // flag : worker thread 已经启动运行了,是否可以中断
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) //再次检查state状态,完成的话(上面的三种),直接返回false;
                return false;
            Thread t = runner;
            if (t != null)       // t == null对应Future task还没启动, 跳过thread.interrupt(),直接由interrpting -> interrupted,成功的话
                t.interrupt();   //调用worker thread的 interrupt() //mayInterrptIfRunning 为true ,interrupt 状态转变 new -> interrupting -> interrupted.
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) //mayInterruptIfRunning 为false,interrupt成功的 状态转变 new -> Cancelled
            return false;
        finishCompletion();
        return true;
    }

由上面可知,客户端cancel()中不少cas操作,主要来自两方面的racing, 1:线程池worker Thread的完成(异常,正常)状态设置; 2:同一futuretask,不同客户端线程callers的cancel操作。

 private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let‘s spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

当state处于Interrupting, 即caller即将调用worker thread.interrupt(), 所以worker thread自旋会,等会interrupt方法的调用,保留interrupt标志。

再来看看get()和带参数的get(timeout):

 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)  //结果未设定的情况下
            s = awaitDone(false, 0L); //无条件等待
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等到timeout时间内,没完成,throws TimeoutException
            throw new TimeoutException();
        return report(s);
    }

awaitDone():

   private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L; //计算出等待时间。
        WaitNode q = null;
        boolean queued = false;  //是否加入阻塞队列的标志
        for (;;) {
            if (Thread.interrupted()) { //阻塞该caller线程之前,caller线程被中断,直接throw 异常
                removeWaiter(q);    //在阻塞队列中移除该线程的封装node.此处无意义
                throw new InterruptedException();
            }

            int s = state; //读取state,阻塞前 recheck一下 是否完成?
            if (s > COMPLETING) { //完成了,直接返回,不要阻塞了
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield(); //等会,快完成了。
            else if (q == null)
                q = new WaitNode(); //当前阻塞线程链表的简单封装
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); //设为当前FutureTask阻塞链表(stack结构)的栈顶。
            else if (timed) {
                nanos = deadline - System.nanoTime(); //计算当前要阻塞的等待时间
                if (nanos <= 0L) {
                    removeWaiter(q); //小于0 直接返回,当前REMOVEWaiter无意义,并没有加入stack中。
                    return state;
                }
                LockSupport.parkNanos(this, nanos);本地native方法,阻塞当前线程。
            }
            else
                LockSupport.park(this); //无时间条件阻塞
        }
    }

无时间限制阻塞,有时间阻塞(阻塞时间大于task完成时间)会等到任务完成而给通知,唤醒该线程,即finishCompletion();而有时间阻塞(阻塞时间在task完成之间就已经结束的)会通过for()退出(退出前,删除等待队列中的节点)。

WaiterNode定义:

  static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); } //当前阻塞线程的引用
    }

结合awaitDone()中的新阻塞节点加入顺序,其定位stack结构(Treiber stack);

removeWaiter():

 private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // 检测竞争
                            continue retry; //发生重试
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

finishCompletion():

 private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) { 得到栈顶阻塞节点的引用
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread; //取得阻塞的线程引用,
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//解阻塞
                    }
                    WaitNode next = q.next; //遍历解阻塞线程
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

其实,前面的分析可知,多个caller线程并发提交同一个FutureTask, 并且所谓调用get()阻塞的话(阻塞在该FutureTask上),实际上也就一个caller线程阻塞,其他线程在调用该FutureTask的run()开始条件检查时,就直接return了,实际情况:三个并发线程提交同一个future task,对应生成三份FutureTask(不同于之前),三份FutureTask中对应三分Callable,而这三份Callable含有相同的FutureTask(所谓的相同任务) ,向ThreadPoolExecutor.submit(Runnable)实际上提交了三份Runnable(即生成的三分FutureTask), FutureTask实现了Runnable接口, 然后ThreadPoolExecutor生成三个线程来执行这所谓的三个任务,这三个任务run()中都是调用对应内部的callable的call(), 而callable的call方法调用的是他们共同引用的FutureTask(同一个对像)的run()方法,而run方法, 我们上面解析过了,通过cas和状态检测,只运行一个worker thread 调用run()(见上),另外两个线程直接从共同底层FutureTask的run方法开始直接返回。

晕了?从头再来看看提交的过程:

1:submit(FutureTask(Runnable)):AbstractExecutorService

  public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

2:生成三个FutureTask(其中runnable就是同一个底层FutureTask任务):

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

3:调用Executors.callable():

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
 static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() { //直接调用底层同一个FutureTask的run();
            task.run();
            return result;
        }
    }

即三次提交,生成三份FutureTask,每份FutureTask调用Executors.callable()为自己底层的callable赋值,而Executors.callable方法生成简单的Callable实现,其中call(),调用底层共同FutureTask的run(), 也就受共同futureTask内部状态(state, runThread)限制。所以,阻塞在底层共同FutureTask阻塞队列中的只有一个线程,看下例:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        int waitTime = 4000;
        CountDownLatch latch = new CountDownLatch(1);
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        for (int i = 0 ; i < 3; i++) {
            new Thread(new Submit(executor,  futureTask, latch, waitTime)).start();
        }
        try {
            Thread.sleep(3000);
            latch.countDown();
            Thread.sleep(8000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("所有任务执行完毕");
        executor.shutdown();
    }

}

class Submit implements Runnable {
    private CountDownLatch latch ;
    private ExecutorService es ;
    private FutureTask<Integer> task;
    private int waitTime ;
    public Submit(ExecutorService es, FutureTask<Integer> task, CountDownLatch latch, int waitTime) {
         this.latch = latch;
         this.es = es;
         this.task = task;
         this.waitTime = waitTime;
    }
    public void run() {
        try {
            latch.await();
            Future<?> future =  es.submit(task);
            System.out.println("Thread name : " + Thread.currentThread().getName() + " go!");
            waitTime = new Random().nextInt(waitTime);
            System.out.println("Thread name : " + Thread.currentThread().getName() + " , The wait time : =  " + waitTime );
            future.get(waitTime, TimeUnit.MILLISECONDS);
            System.out.println("Thread name : " + Thread.currentThread().getName() + " run over!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
        } catch (TimeoutException e2) {
            System.err.println("Thread name : " + Thread.currentThread().getName()  + " " + e2);
        }
    }
}

class Task implements Callable<Integer>{

    public Integer call() throws Exception {
        System.out.println("thread name : " + Thread.currentThread().getName() + " do the work!");
        Thread.sleep(4000);
        int sum = 0;
        for(int i=0;i<20;i++)
            sum += i;
        return sum;
    }
}

class Task1 implements Runnable{
    int sum = 0;
    @Override
    public void run() {
        System.out.println("Thread Name : " + Thread.currentThread().getName() + "do the work!");
        try {
            Thread.sleep(6000);

            for(int i=0;i<100;i++)
                sum += i;
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

显示结果:

Thread name : Thread-2 go!
Thread name : Thread-0 go!
Thread name : Thread-0 , The wait time : =  2738
thread name : pool-1-thread-1 do the work!
Thread name : Thread-1 go!
Thread name : Thread-2 , The wait time : =  284
Thread name : Thread-1 , The wait time : =  678
Thread name : Thread-2 run over!
Thread name : Thread-0 run over!
Thread name : Thread-1 java.util.concurrent.TimeoutException
所有任务执行完毕

三个线程都是阻塞一段时间,但是只有一个超时,另外两个运行完毕,(他两实际工作那部分没运行,处理各自FutureTask那部分代码,所以只能看到线程池只有一个线程处理底层FutureTask);

但,如果直接并发提交Callable,或者Runnable,线程池会启动三个线程来分别处理这三个不同任务,朋友可以自行试验demo下。而FutureTask是自己的自身的限制。

后话,一般调用ThreadPoolExecutor.submit()提交的是Callable<T>和Runnable, 返回的Future<T>, Future<?>(返回Null,或者不要求返回值),提交FutureTask用不着,所以实际中不会遇见这种情况。

另外,本文源码基于jdk1.7,与网上1.7之前源码不同(1.6通过AQS实现)。

时间: 2024-10-13 21:55:49

J.U.C FutureTask之源码解析的相关文章

FutureTask 源码解析

FutureTask 源码解析 版权声明:本文为本作者原创文章,转载请注明出处.感谢 码梦为生| 刘锟洋 的投稿 站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:1. 线程执行结果带有返回值2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回. 那,怎么实现future这种超时控制呢?来看看代码: FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,

String源码解析(一)

本篇文章内的方法介绍,在方法的上面的注释讲解的很清楚,这里只阐述一些要点. Java中的String类的定义如下: 1 public final class String 2 implements java.io.Serializable, Comparable<String>, CharSequence { ...} 可以看到,String是final的,而且继承了Serializable.Comparable和CharSequence接口. 正是因为这个特性,字符串对象可以被共享,例如下面

【特征匹配】RANSAC算法原理与源码解析

转载请注明出处:http://blog.csdn.net/luoshixian099/article/details/50217655 随机抽样一致性(RANSAC)算法,可以在一组包含"外点"的数据集中,采用不断迭代的方法,寻找最优参数模型,不符合最优模型的点,被定义为"外点".在图像配准以及拼接上得到广泛的应用,本文将对RANSAC算法在OpenCV中角点误匹配对的检测中进行解析. 1.RANSAC原理 OpenCV中滤除误匹配对采用RANSAC算法寻找一个最佳

给jdk写注释系列之jdk1.6容器(10)-Stack&amp;Vector源码解析

前面我们已经接触过几种数据结构了,有数组.链表.Hash表.红黑树(二叉查询树),今天再来看另外一种数据结构:栈. 什么是栈呢,我就不找它具体的定义了,直接举个例子,栈就相当于一个很窄的木桶,我们往木桶里放东西,往外拿东西时会发现,我们最开始放的东西在最底部,最先拿出来的是刚刚放进去的.所以,栈就是这么一种先进后出( First In Last Out,或者叫后进先出) 的容器,它只有一个口,在这个口放入元素,也在这个口取出元素. 栈最主要了两个动作就是入栈和出栈操作,其实还是很容易的明白的对不

spring mvc源码解析

1.从DispatcherServlet开始     与很多使用广泛的MVC框架一样,SpringMVC使用的是FrontController模式,所有的设计都围绕DispatcherServlet 为中心来展开的.见下图,所有请求从DispatcherServlet进入,DispatcherServlet根据配置好的映射策略确定处理的 Controller,Controller处理完成返回ModelAndView,DispatcherServlet根据配置好的视图策略确定处理的 View,由V

GlusterFS源码解析—— GlusterFS 命令行常见错误

问题1 [[email protected] ~]# gluster peer status Connection failed. Please check if gluster daemon is operational. 原因:未开启glusterd服务 解决方法:开启glusterd服务 /etc/init.d/glusterd start 问题2 [[email protected] ~]# gluster peer probe server-130 peer probe: failed

Android AsyncTask 源码解析

1. 官方介绍 public abstract class AsyncTask extends Object  java.lang.Object    ? android.os.AsyncTask<Params, Progress, Result> AsyncTask enables proper and easy use of the UI thread. This class allows to perform background operations and publish resul

第十四章 Executors源码解析

前边两章介绍了基础线程池ThreadPoolExecutor的使用方式.工作机理.参数详细介绍以及核心源码解析. 具体的介绍请参照: 第十二章 ThreadPoolExecutor使用与工作机理 第十三章 ThreadPoolExecutor源码解析 1.Executors与ThreadPoolExecutor ThreadPoolExecutor 可以灵活的自定义的创建线程池,可定制性很高 想创建好一个合适的线程池比较难 使用稍微麻烦一些 实际中很少使用 Executors 可以创建4种线程池

jquery源码解析:jQuery对元素属性的操作2

这一课,我们将继续讲解jQuery对元素属性操作的方法. 首先,我们先看一下这几个方法是如何使用的: $("#div1").addClass("box1 box2");     //给元素div的class属性添加box1和box2 $("#div1").removeClass("box1");     //删除元素div的class属性值box1 $("#div1").toggleClass("