在Java中比较常见的两种创建线程的方法:继承Thread类和实现Runnable接口。但是这两种方法有个缺点就是无法获取线程执行后的结果。所以Java之后提供了Future和Runnable接口,用于实现获取线程执行结果。下面开始源码分析:
1、Callable接口
public interface Callable<V> {
//返回接口,或者抛出异常
V call() throws Exception;
}
2、Future接口
public interface Future<V> {
/***尝试取消任务,如果任务已经完成、已取消或其他原因无法取消,则失败。
** 1、如果任务还没开始执行,则该任务不应该运行
** 2、如果任务已经开始执行,由参数mayInterruptIfRunning来决定执行该任务的线程是否应该被中断,这只是终止任务的一种尝试。若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
** 3、调用这个方法后,以后对isDone方法调用都返回true。
** 4、如果这个方法返回true,以后对isCancelled返回true。
***/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否被取消了,如果调用了cance()则返回true
*/
boolean isCancelled();
/**
*如果任务完成,则返回ture
*任务完成包含正常终止、异常、取消任务。在这些情况下都返回true
*/
boolean isDone();
/**
* 线程阻塞,直到任务完成,返回结果
* 如果任务被取消,则引发CancellationException
* 如果当前线程被中断,则引发InterruptedException
* 当任务在执行的过程中出现异常,则抛出ExecutionException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 线程阻塞一定时间等待任务完成,并返回任务执行结果,如果则超时则抛出TimeoutException
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3、FutureTask
????Future只是一个接口,不能直接用来创建对象,其实现类是FutureTask,JDK1.8修改了FutureTask的实现,JKD1.8不再依赖AQS来实现,而是通过一个volatile变量state以及CAS操作来实现。FutureTask结构如下所示:
public class FutureTask<V> implements RunnableFuture<V> {
/*
* 当前任务运行状态
* NEW -> COMPLETING -> NORMAL(正常结束,返回结果)
* NEW -> COMPLETING -> EXCEPTIONAL(返回异常结果)
* NEW -> CANCELLED(任务被取消,无结果)
* NEW -> INTERRUPTING -> INTERRUPTED(任务被打断,无结果)
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 将要被执行的任务 */
private Callable<V> callable;
/** 存放执行结果,用于get()方法获取结果,也可能用于get()方法抛出异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 执行任务Callable的线程; */
private volatile Thread runner;
/** 栈结构的等待队列,该节点是栈中最顶层的节点 */
private volatile WaitNode waiters;
为了后面更好的分析FutureTask的实现,这里有必要解释下各个状态。
NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。
COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。
有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。
3.1、FutureTask构造方法
// Callable 构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* runnable 构造函数
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3.2、get()方法阻塞队列
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3.3、run方法解析
//Executor调用执行任务
//
public void run() {
//状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返返回,当然如果执行任务的线程runner不为null,说明任务正在执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//执行任务
try {
Callable<V> c = callable;
//判断任务是否为null,状态是否为NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //暂时存放任务执行结果
ran = true;
} catch (Throwable ex) {
result = null;
ran = false; //执行失败
//通过CAS算法设置返回值(COMPLETING)和状态值(EXCEPTIONAL)
setException(ex);
}
//执行成功通过CAS(UNSAFE)设置返回值(COMPLETING)和状态值(NORMAL)
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
//将任务runner设置为null,避免发生并发调用run()方法
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
//须重新读取任务状态,避免不可达(泄漏)的中断
int s = state;
//确保cancle(ture)操作时,运行中的任务能接收到中断指令
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
3.4、setException(Throwable t)解析
//发生异常时,将返回值设置到outcome(=COMPLETING)中,并更新任务状态(EXCEPTIONAL)
protected void setException(Throwable t) {
//调用UNSAFE类封装的CAS算法,设置值
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//唤醒因等待返回值而阻塞的线程
finishCompletion();
}
}
3.5、set(V v)方法解析
//任务正常完成,将返回值设置到outcome(=COMPLETING)中,并更新任务状态(=NORMAL)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
3.6、 finishCompletion解析
//移除所有等待线程并发出信号,调用done(),以及将任务callable清空
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//循环唤醒阻塞线程,直到阻塞队列为空
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
//一直到阻塞队列为空,跳出循环
if (next == null)
break;
q.next = null; // unlink to help gc 方便gc在适当的时候回收
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
3.7、handlePossibleCancellationInterrupt 方法解析
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let‘s spin-wait patiently.
//自旋等待cancle(true)结束(中断结束)
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
3.8、cancle方法解析
//取消任务执行
public boolean cancel(boolean mayInterruptIfRunning) {
//对NEW状态的任务进行中断,并根据参数设置state
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//任务已完成(已发出中断或已取消)
return false;
//中断线程
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//cancel(true)
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//通过CAS算法,更新状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒阻塞线程
finishCompletion();
}
return true;
}
3.9 get方法解析
/**
* 获取执行结果
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//假如任务还没有执行完,则则塞线程,直至任务执行完成(结果已经存放到对应变量中)
s = awaitDone(false, 0L);
//返回结果
return report(s);
}
/**
* 获取任务执行结果,指定时间结束,则超时返回,不再阻塞
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
3.10 awaitDone解析
/**
* Awaits completion or aborts on interrupt or timeout.
* 如英文注释:等待任务执行完毕或任务中断或任务超时
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//循环等待
for (;;) {
//线程已经中断,则移除等待任务
if (Thread.interrupted()) {
removeWaiter(q);
//移除当前任务后,抛出中断异常
throw new InterruptedException();
}
//任务已经完成,则返回任务状态,并对当前任务清场处理
int s = state;
if (s > COMPLETING) {
if (q != null) //任务不为空,则将执行线程设为null,避免并发下重复执行
q.thread = null;
return s;
}
//设置结果,很快就能完成,自旋等待
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); //任务提前退出
//正在执行或者还没开始,则构建新的节点
else if (q == null)
q = new WaitNode();
//判断是否入队,新节点一般在下一次循环入队列阻塞
else if (!queued)
//没有入队列,设置q.next=waiters,并将waiters设为q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//假如有超时限制,则判断是否超时
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//超时则将任务节点从阻塞队列中移除,并返回状态
removeWaiter(q);
return state;
}
//阻塞调用get方法的线程,有超时限制
LockSupport.parkNanos(this, nanos);
}
else
//阻塞调用get方法的线程,无超时限制
LockSupport.park(this);
}
}
3.11 removeWaiter方法解析
//移除任务节点
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
原文地址:https://blog.51cto.com/3265857/2360855
时间: 2024-10-12 10:22:06