Executor实现----AbstractExecutorService实现分析

----------------------------学习笔记,不保证正确性!-------------------------

1、首先来看一段使用示例

ExecutorService recmdService = Executors. newFixedThreadPool(1);

Future<List<Long>> recmdFuture = recmdService.submit( new Callable<List<Long>>()
{

@Override

public List<Long>
call() throws Exception {

/*

                 * do something here

                 */

                 return result;

}

});

/*

* do something here.

*/

List<Long> recmdPoiIds = null;

try {

recmdPoiIds = recmdFuture.get(10, TimeUnit. SECONDS);

catch (Exception
e) {

logger.error("error
information " , e);

recmdPoiIds = new ArrayList<Long>();

}

上面的示例代码来自于工作中出现的一段使用Executor框架的示例,当然也只能算是对Executor框架的一种非常简单的应用。大体的意思是在执行主体任务的同时重新开了一个线程去同步执行另一个任务。然后再主体任务执行完后,同时去获取在这个新开的线程中执行任务的结果。

示例虽然简单,但其中也包括了Executor的一些基本组成元素,也是了解Executor所需要的最基本的东西:任务在一个单独的线程中执行、任务提交时返回一个Future对象、通过Future对象去获取任务的执行结果、获取任务执行结果时可能会造成当前线程的阻塞。

2、任务的提交

在执行recmdService.submit时,任务被提交到Executor框架中,进入执行,并且返回一个Future对象。可以猜想,这里肯定是生成了一个新的线程去执行任务,那么这个任务和返回的Future对象之间有什么关系,线程又是怎么生成的。下面将通过相关代码来进行分析。

ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,实现了submit方法,仍把execute方法留待子类实现。下面来看submit方法的实现

public <T>
Future<T> submit(Callable<T> task) {

if (task
== nullthrow new NullPointerException();

RunnableFuture<T> ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

protected <T> RunnableFuture<T> newTaskFor (Callable<T>
callable) {

return new FutureTask<T>(callable);

}

submit方法所做的事情其实很简单,生成了一个FutureTask对象,调用execute方法,然后返回。execute方法的执行涉及到ThreadPoolExecutor的很多细节,这里可以理解为开启一个新线程,在新线程中执行,由于这里是开启新线程后执行任务,所以,submit方法不会阻塞调用线程。

由于在调用recmdFuture.get方法时会造成当前线程的阻塞,所以这里需要来关注下FutureTask的实现,是如何实现这种效果的。

首先需要明确下线程的关系:

a、执行任务的线程,也就是通过ThreadPoolExecutor创建的线程,任务在这个线程中执行,但我们无法获得这个线程的Thread对象

b、拥有recmdFuture的线程,也就是调用Executor框架的线程,可以理解成我们的“主线程”

FutureTask实现了RunnableFuture接口,只有一个Sync的属性,Sync类和属性的定义如下

private final class Sync extends AbstractQueuedSynchronizer
{

private static final long serialVersionUID
= -7828117401763700385L;

/** State value representing that task is
running */

private static final int RUNNING  
= 1;

/** State value representing that task ran
*/

private static final int RAN      
= 2;

/** State value representing that task was
cancelled */

private static final int CANCELLED
= 4;

/** The underlying callable */

private final Callable<V>
callable;

/** The result to return from get() */

private V
result;

/** The exception to throw from get() */

private Throwable
exception;

/**

* The thread running task. When nulled after set/cancel, this

* indicates that the results are accessible.  Must be

* volatile, to ensure visibility upon completion.

*/

private volatile Thread runner ;

注意三点:Sync继承了AbstractQueuedSynchronizer ,使用了jdk的AQS线程同步框架;有一个V result属性,是用来存储任务执行完之后的结果对象;有一个Thread
runner属性,用来表示执行任务的那个线程。

3、任务结果的获取

由上面的分析可知,在通过Executor提交任务时,返回的其实是一个FutureTask对象。在实际中,如果任务执行的耗时较长,在调用get方法获取结果时,可能会造成线程的阻塞,如上面示例中的recmdFuture.get(10,
TimeUnit. SECONDS),指定了一个最长等待时间。那么,结果是如何传递的,阻塞又是如何实现的呢?

还是来看FutureTask的get方法,这是获取任务执行结果的入口,

public V
get( long timeout, TimeUnit unit)

throws InterruptedException,
ExecutionException, TimeoutException {

return sync.innerGet(unit.toNanos(timeout));

}

通过调用Sync的innerGet来执行,下面来看实现

V innerGet(long nanosTimeout) throws InterruptedException,
ExecutionException, TimeoutException {

if (!tryAcquireSharedNanos(0,
nanosTimeout))

throw new TimeoutException();

if (getState()
== CANCELLED)

throw new CancellationException();

if (exception
!= null)

throw new ExecutionException(exception);

return result;

}

调用了AQS的tryAcquireSharedNanos,在这里实现了调用Future的get方法的阻塞,也就是上面说的“主线程的阻塞”。但,在AQS的解析中,我们了解,这个方法并不一定会导致调用线程的阻塞(也就是进入阻塞队列中)。需要有一个线程以排他的方式占据当前的同步对象,这样其它线程在试图获取共享对象时才会被阻塞。

结合对Executor框架的使用,正常情况下,只有当任务执行完成后,获取结果的线程才不会阻塞,所以我们可以猜测,这个以排他方式占据共享对象的线程就是执行任务的线程,也就是通过ThreadPoolExecutor创建的那个线程。在任务执行之前,这个线程先以排他的方式获取了共享对象,然后再任务执行完成(Callable的call方法)后,释放共享对象。

FutureTask实现了RunnableFuture接口,而RunnableFuture又继承Runnable接口,也就是说FutureTask其实本身就是一个Runnable对象,也就实现了run方法。这个方法正式一个线程被启动时要执行的任务。来看FutureTask的run方法的实现

public void run()
{

sync.innerRun();

}

run方法的执行已经是在被启动线程中,也就是和我们“主线程”不同的那个执行任务的线程,由ThreadPoolExecutor创建的线程。

void innerRun ()
{

if (!compareAndSetState(0,
RUNNING))

return;

try {

runner = Thread.currentThread();

if (getState()
== RUNNING) // recheck after setting thread

innerSet(callable.call());

else

releaseShared(0); // cancel

catch (Throwable
ex) {

innerSetException(ex);

}

}

首先通过CAS框架把共享对象的状态设置为RUNNING状态,实现了以排他方式获取共享对象。然后设置runner=Thread.currentThread();把runner设置为当前线程,由于线程是通过ThreadPoolExecutor创建和启动的,所以这里就是把runner对象设置为在执行任务的那个线程。调用callable.call方法执行任务,然后innerSet设置返回结果。

void innerSet(V
v) {

for (;;)
{

int s
= getState();

if (s
== RAN)

return;

if (s
== CANCELLED) {

// aggressively release to set runner
to null,

// in case we are racing with a
cancel request

// that will try to interrupt runner

releaseShared(0);

return;

}

if (compareAndSetState(s,
RAN)) {

result = v;

releaseShared(0);

done();

return;

}

}

}

innerSet主要做三件事情:设置result字段,也就是保存任务执行的结果;设置共享对象的状态,表明任务已经执行完毕;释放共享对象,唤醒那些等待获取结果的线程。

使用releaseShared的方式唤醒,是因为那些获取结果的线程都是以共享的方式阻塞在这个共享对象上(具体可以参考"共享锁和排它锁"一章),所以释放共享对象的时候,可以一次唤醒所有的等待获取结果的线程。

下面是整个流程示意图

时间: 2024-10-29 15:12:48

Executor实现----AbstractExecutorService实现分析的相关文章

Executor-ThreadPoolExecutor实现

1.ThreadPoolExecutor的主要作用 在Oracle中对ThreadPoolExecutor的作用进行了说明:1.在大量的异步任务到达的情况下,使用线程池能够提升性能:2.提供一种资源管理和调度的方法. 一般通过Executors的工厂方法来生成一个线程池对象,Executors提供了多种方法来构造不同的线程池:1.带有缓存性质的线程池 Executors.newCachedThreadPool(),线程池的大小不固定,并且会随着使用情况自动调整线程池的大小:2.固定大小的线程池E

spark core源码分析7 Executor的运行

实际任务的运行,都是通过Executor类来执行的.这一节,我们只介绍Standalone模式. 源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath

获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取: 方式一: 通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成.如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 Java代码 public class CompletionServiceTest { static class Task impleme

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

spark core源码分析13 异常情况下的容错保证

博客地址: http://blog.csdn.net/yueqian_zhu/ standalone模式下的框架图如下: 异常分析1: worker异常退出 worker异常退出,比如说有意识的通过kill指令将worker杀死 worker在退出之前,会将自己所管控的所有小弟executor全干掉 worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个"分舵"离开了 Master非常伤心,伤心的Ma

JDK线程池框架Executor源码阅读

Executor框架 Executor ExecutorService AbstractExecutorService ThreadPoolExecutor ThreadPoolExecutor继承AbstractExecutorService,是一个线程池的具体的实现 主要成员 1. ctl private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT

ExecutorCompletionService分析及使用

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取: 方式一: 通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成.如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 1 public class ExecutorCompletionServiceTest { 2 3 static class Task

MyBatis 源码分析 - 插件机制

1.简介 一般情况下,开源框架都会提供插件或其他形式的拓展点,供开发者自行拓展.这样的好处是显而易见的,一是增加了框架的灵活性.二是开发者可以结合实际需求,对框架进行拓展,使其能够更好的工作.以 MyBatis 为例,我们可基于 MyBatis 插件机制实现分页.分表,监控等功能.由于插件和业务无关,业务也无法感知插件的存在.因此可以无感植入插件,在无形中增强功能. 开发 MyBatis 插件需要对 MyBatis 比较深了解才行,一般来说最好能够掌握 MyBatis 的源码,门槛相对较高.本篇

Executor 线程池

1.什么是线程池:  java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力. 假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间. 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能. 一个线程池包括以下四个基