Java多线程系列--“JUC线程池”06之 Callable和Future

概要

本章介绍线程池中的Callable和Future。
Callable 和 Future 简介
示例和源码分析(基于JDK1.7.0_40)

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3544116.html

Callable 和 Future 简介

  Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。

1. Callable

Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。

为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。

Callable的源码如下:

public interface Callable<V> {
    V call() throws Exception;
}

说明:从中我们可以看出Callable支持泛型。

2. Future

Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

Future的源码如下:

public interface Future<V> {
    // 试图取消对此任务的执行。
    boolean     cancel(boolean mayInterruptIfRunning)

    // 如果在任务正常完成前将其取消,则返回 true。
    boolean     isCancelled()

    // 如果任务已完成,则返回 true。
    boolean     isDone()

    // 如有必要,等待计算完成,然后获取其结果。
    V           get() throws InterruptedException, ExecutionException;

    // 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
    V             get(long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException;
}

说明: Future用于表示异步计算的结果。它的实现类是FutureTask,在讲解FutureTask之前,我们先看看Callable, Future, FutureTask它们之间的关系图,如下:

说明
(01) RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

(02) FutureTask实现了RunnableFuture接口。所以,我们也说它实现了Future接口。

示例和源码分析(基于JDK1.7.0_40)

我们先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。

 1 import java.util.concurrent.Callable;
 2 import java.util.concurrent.Future;
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.ExecutionException;
 6
 7 class MyCallable implements Callable {
 8
 9     @Override
10     public Integer call() throws Exception {
11         int sum    = 0;
12         // 执行任务
13         for (int i=0; i<100; i++)
14             sum += i;
15         //return sum;
16         return Integer.valueOf(sum);
17     }
18 }
19
20 public class CallableTest1 {
21
22     public static void main(String[] args)
23         throws ExecutionException, InterruptedException{
24         //创建一个线程池
25         ExecutorService pool = Executors.newSingleThreadExecutor();
26         //创建有返回值的任务
27         Callable c1 = new MyCallable();
28         //执行任务并获取Future对象
29         Future f1 = pool.submit(c1);
30         // 输出结果
31         System.out.println(f1.get());
32         //关闭线程池
33         pool.shutdown();
34     }
35 }

运行结果

4950

结果说明
 
 在主线程main中,通过newSingleThreadExecutor()新建一个线程池。接着创建Callable对象c1,然后再通过
pool.submit(c1)将c1提交到线程池中进行处理,并且将返回的结果保存到Future对象f1中。然后,我们通过f1.get()获取
Callable中保存的结果;最后通过pool.shutdown()关闭线程池。

1. submit()

submit()在java/util/concurrent/AbstractExecutorService.java中实现,它的源码如下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 创建一个RunnableFuture对象
    RunnableFuture<T> ftask = newTaskFor(task);
    // 执行“任务ftask”
    execute(ftask);
    // 返回“ftask”
    return ftask;
}

说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

2. FutureTask的构造函数

FutureTask的构造函数如下:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    // callable是一个Callable对象
    this.callable = callable;
    // state记录FutureTask的状态
    this.state = NEW;       // ensure visibility of callable
}

3. FutureTask的run()方法

我们继续回到submit()的源码中。

newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进
行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中实
现,源码如下:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        // 将callable对象赋值给c。
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行Callable的call()方法,并保存结果到result中。
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 如果运行成功,则将result保存
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        // 设置“state状态标记”
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

说明:run()中会执行Callable对象的call()方法,并且最终将结果保存到result中,并通过set(result)将result保存。
      之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值。

时间: 2024-10-09 22:19:25

Java多线程系列--“JUC线程池”06之 Callable和Future的相关文章

Java多线程系列--“JUC线程池”01之 线程池架构

概要 前面分别介绍了"Java多线程基础"."JUC原子类"和"JUC锁".本章介绍JUC的最后一部分的内容——线程池.内容包括:线程池架构图线程池示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509903.html 线程池架构图 线程池的架构图如下: 1. Executor 它是"执行者"接口,它是来执行任务的.准确的说,Executor提供了execute()接口来执行

Java多线程系列--“JUC线程池”03之 线程池原理(二)

线程池示例 在分析线程池之前,先看一个简单的线程池示例. import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; public class ThreadPoolDemo1 { public static void main(String[] args) { // 创建一个可重用固定线程数的线程池 ExecutorService pool = Executors.newFixedThre

Java多线程系列--“JUC线程池”05之 线程池原理(四)

拒绝策略介绍 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施.当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭.第二,任务数量超过线程池的最大限制. 线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy. AbortPolicy -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常. Calle

Java多线程系列--“JUC线程池”02之 线程池原理(一)

ThreadPoolExecutor简介 ThreadPoolExecutor是线程池类.对于线程池,可以通俗的将它理解为"存放一定数量线程的一个线程集合.线程池允许同时运行的线程数量就是线程池的容量:当添加到线程池中的线程超过它的容量时,会有一部分线程阻塞等待.线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理." ThreadPoolExecutor数据结构 ThreadPoolExecutor的数据结构如下图所示: 各个数据在ThreadPoolExecutor

Java多线程系列--“JUC线程池”04之 线程池原理(三)

本章介绍线程池的生命周期. 线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态.线程池也有5种状态:然而,线程池不同于线程,线程池的5种状态是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED. 线程池状态定义代码如下: /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indi

Java多线程系列 JUC线程池07 线程池原理解析(六)

 关闭“线程池” shutdown()的源码如下: public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { // 检查终止线程池的“线程”是否有权限. checkShutdownAccess(); // 设置线程池的状态为关闭状态. advanceRunState(SHUTDOWN); // 中断线程池中空闲的线程. interruptIdleWork

Java多线程系列---“JUC原子类”06之 AtomicLongFieldUpdater原子类

转自:http://www.cnblogs.com/skywang12345/p/3514635.html (含部分修改) 概要 AtomicIntegerFieldUpdater, AtomicLongFieldUpdater和AtomicReferenceFieldUpdater这3个修改类的成员的原子类型的原理和用法相似.本章以对基本类型的原子类进行介绍.内容包括: AtomicLongFieldUpdater介绍和函数列表 AtomicLongFieldUpdater示例 AtomicL

Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例

概要 本章,我们对JUC包中的信号量Semaphore进行学习.内容包括:Semaphore简介Semaphore数据结构Semaphore源码分析(基于JDK1.7.0_40)Semaphore示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3534050.html Semaphore简介 Semaphore是一个计数信号量,它的本质是一个"共享锁". 信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可

Java多线程系列--“JUC锁”05之 非公平锁

获取非公平锁(基于JDK1.7.0_40) 非公平锁和公平锁在获取锁的方法上,流程是一样的:它们的区别主要表现在"尝试获取锁的机制不同".简单点说,"公平锁"在每次尝试获取锁时,都是采用公平策略(根据等待队列依次排序等待):而"非公平锁"在每次尝试获取锁时,都是采用的非公平策略(无视等待队列,直接尝试获取锁,如果锁是空闲的,即可获取状态,则获取锁).在前面的"Java多线程系列--"JUC锁"03之 公平锁(一)&q