Executor框架完整解读

1 前言

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供.

在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

2 Executor框架组成

Executor框架主要由3大部分组成如下:

  • ①任务: RunnableCallable接口及其实现类
  • ②任务执行器: 主要是Executor及扩展Executor的ExecutorService接口的一些实现类。Executor框架有两个重要的实现类,一个是线程池执行器ThreadPoolExecutor、另一个是定时任务执行器ScheduledThreadPoolExecutor .
  • ③任务的结果: Future接口及其默认实现FutureTask

说明:

Runnable接口(无返回值)和Callable接口(有返回值)的实现类,都可以被ThreadPoolExecutorScheduledThreadPoolExecutor执行。

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令 。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。 Future接口和实现Future接口的FutureTask类,代表异步任务的结果。

3 Runnable和Callable

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。它们之间的区别是Runnable没有返回值,无法判断任务是否完成,而Callable有一个返回结果。

除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors将一个Runnable包装成一个Callable.

callable(Runnable )方法包装不需要结果的Runnable任务,任务完成后Future.get()会获取一个null值;而callable(Runnable,T)可以包装一个需要结果的Runnable任务,结果可以通过参数result指定,任务完成后Future.get()可以获得这个结果result.

    public static Callable<Object> callable(Runnable task)
    public static <T> Callable<T> callable(Runnable task, T result)

4 ThreadPoolExecutor

ThreadPoolExecutorExecutorService的最重要的实现类,ThreadPoolExecutor不直接实现ExecutorService接口,它直接继承于AbstractExecutorService抽象类, AbstractExecutorServiceExecutorSerivice接口中的一些方法做过的默认实现 。

之前的文章线程池ThreadPoolExecutor简介对使用构造方法创建线程池已做详细说明,这里介绍使用工厂类Executors来创建线程池。ThreadPoolExecutor可以使用工厂类Executors提供了一些静态工厂方法,可以以此方便地创建一些常用配置的线程池。Executors可以创建3种类型的ThreadPoolExecutor .

1) 固定线程池

newFixedThreadPool系列方法创建固定线程数的线程池。它适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

    //创建固定线程数的线程池
 public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
  }
 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), threadFactory);
  }

这种线程池的corePoolSize和maximumPoolSize都被设置为参数nThreads。当线程池中的线程数大于corePoolSize时,keepAliveTime为非核心线程等待新任务的最长时间,超过这个时间后的这些线程将被终止。corePoolSize和maximumPoolSize参数设置为相同值,表示线程池中所有线程均是核心线程, 那么keepAliveTime参数就是无意义的.

处理任务流程说明:

①当线程池中的线程数小于corePoolSize时,线程池会创建新线程去执行任务。

②在经过一段时间预热后,线程数达到了corePoolSize(因为maximumPoolSize与corePoolSize相同,此时也达到了最大线程数,以后不会再创建线程),开始将任务放入工作队列中。

③此后有新任务到达就向工作队列中放入(LinkedBlockingQueue无参构造方法,创建的队列容量是Integer.MAX_VALUE ,这种队列几乎不可能容量爆满,不会拒绝任务,拒绝策略不起作用),若有线程处于空闲状态则从工作队列中获取任务并执行。

2) 单线程池

newSingleThreadExecutor系列方法创建单个线程的线程池 它适用于保证任务按顺序执行,并且在任何时候最多只有一个活动(正在执行)的任务。

  //创建单个线程的线程池
  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>(),threadFactory));
  }
  public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,  0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
  }

newSingleThreadExecutor方法不是直接返回ThreadPoolExecutor对象,它将ThreadPoolExecutor对象进行包装成FinalizableDelegatedExecutorService对象,但实际的业务处理还是委托给ThreadPoolExecutor去实现。

这类线程池的corePoolSize和maximumPoolSize都被设为1,线程池中最多有一个线程,同样地这里的keepAliveTime参数是无意义的,它使用一个无限大容量的阻塞队列作为存放任务的容器。

处理任务流程说明:

①当线程池中的无任何线程时,线程池会创建一个线程去执行任务。

②当线程池中有一个线程时,开始将任务放入工作队列中。

③此后有新任务到达就向工作队列中放入(LinkedBlockingQueue无参构造方法,创建的队列容量是Integer.MAX_VALUE ,这种队列几乎不可能容量爆满,不会拒绝任务,拒绝策略不起作用),若线程池中的唯一线程处于空闲状态则从工作队列中取出任务并执行。

3) 缓存线程池

newCachedThreadPool系列方法会根据需要创建新线程的线程池(但若之前创建的线程可用,则将复用这些线程) . 它对线程数没有限制,会按需创建新线程,适用于执行很多的短期异步任务的小任务,或者是负载较轻的服务器。

    //根据需要创建新线程的线程池
  public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
  }
  public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  }

这种线程池的corePoolSize为零、maximumPoolSize为Integer.MAX_VALUE,表明线程池中没有核心线程、所有线程均是非核心线程,且可允许的最大线程数是近乎无限大。keepAliveTime参数设为60,表明空闲线程最多等待60秒就被终止。

这里使用SynchronousQueue作为工作队列,这种队列是没有容量的(当尝试排队时,只有正好有空闲线程正在等待接受任务时才会入队成功),但可允许创建的最大线程数是无限大的。这意味着主线程提交任务的速度大于任务处理的速度,线程池就会创不断建新线程,这样可能导致创建的线程过多,系统资源被耗尽、程序崩溃。

处理任务流程说明:

①因为核心线程数为0,所以线程池在启动时核心线程池就已经满了。在主线程提交第一个任务时,线程池就要将尝试此任务入队,由于SynchronousQueue的特殊性,只有当此时空闲线程也正在出队,入队与出队两者恰好匹配时,主线程会把任务交给空闲线程去执行。否则将进入下一步。

②当线程池中无任何线程或无空闲线程时,将没有线程执行出队操作。此时线程池会创建建一个新线程执行任务

③上一步中创建的新线程在执行完的任务后,会调用SynchronousQueue.poll等待任务出队。这个空闲线程最多等待60秒时间,若主线程在60秒内提交了一个新任务,此空闲线程将获取到这个任务并执行。若等待60秒后,还没等到新任务到达,这个线程将被终止。

5 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是定时任务执行器,它可以通过构造方法创建,也可通过工厂类Executors的静态方法创建。

ScheduledThreadPoolExecutor的构造方法逻辑十分简单,它们直接调用父类实现。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,  RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
} 

Executors可创建两种类型的ScheduledThreadPoolExecutor

①newSingleThreadScheduledExecutor系列方法用于创建单个线程的定时任务执行器。它适用于按照固定顺序执行周期性的定时任务,且最多同时执行一个任务的情况。

②newScheduledThreadPool系列方法用于创建给定个数线程的定时任务执行器。它适用于需要多个线程执行周期任务,同时又要限制线程数、防止创建线程过多耗费资源的情况。

   //单线程的定时任务执行器
     public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    //多线程的定时任务执行器
     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,并实现了表示定时执行器的接口ScheduledExecutorService。它主要用来在给定的延迟之后运行任务,或者定期执行任务.

DelayedWorkQueue是ScheduledThreadPoolExecutor的一个静态内部类,它是一个无界队列,所以父类ThreadPoolExecutor中的maximumPoolSize、keepAliveTime这两个参数无意义、没有任何效果。

执行任务基本流程:

①当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向的DelayedWorkQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask类型任务。

②线程池中的线程从DelayedWorkQueue中尝试获取到期任务,若没有任务到期此线程将阻塞等待,直到真正获取到一个任务,然后执行此任务。

ScheduledThreadPoolExecutor会把待调度的任务ScheduledFutureTask放到一个DelayedWorkQueue中,我们来了解一下ScheduledFutureTask。

ScheduledFutureTask是ScheduledThreadPoolExecutor的一个成员内部类,它继承了FutureTask类,另外还实现了表示周期性任务的ScheduledFutureTask接口,ScheduledFutureTask有3 个重要的成员变量。

  • long型成员变量time,表示这个任务将要被执行的具体时间。
  • long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
  • long型成员变量period,表示任务执行的间隔周期

DelayedWorkQueue封装了一个RunnableScheduledFuture数组,它利用这个数组实现一个基于堆排序的优先级队列,其原理与PriorityQueue类似。ScheduledFutureTask任务会放入这个数组中,DelayedWorkQueue会对数组中的任务按照优先级排序。排序的基本原则是: time小的排在前面,如果两个任务的time相同,就比较sequenceNumber,sequenceNumber小的排在前面。换句话说,时间早的任务先执行,若几个任务同样早,就看谁先提交,先提交的任务先执行。

执行一个任务的完整步骤:

①某空闲线程从DelayedWorkQueue中获取已到期的ScheduledFutureTask任务(到期任务是指ScheduledFutureTask的time大于等于当前时间)。

②此线程执行这个ScheduledFutureTask任务。

③此线程修改ScheduledFutureTask的time变量为下次将要被执行的时间。

④此线程把这个time被修改后的ScheduledFutureTask重新放回DelayedWorkQueue中。

6 Future

Future接口和FutureTask类用来表示异步任务的结果。当我们把Runnable接口或Callable接口的实现类提交(使用ExecutorServicesubmit系列方法提交任务)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,它们将返回一个Future类型的对象(实际返回FutureTask类型对象) , 在调用Future.get()方法时会阻塞等待任务完成后再返回任务的结果。

submit方法提交任务

    public Future<?> submit(Runnable task)
    public <T> Future<T> submit(Runnable task, T result)
    public <T> Future<T> submit(Callable<T> task)

get方法等待任务执行完成并获取结果

 public V get() throws InterruptedException, ExecutionException
 public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException

Future、FutureTask的详细分析在之前的文章FutureTask源码完整解读已有说明,这里不再赘述。

参考:《Java并发编程的艺术》、《Java的逻辑》

原文地址:https://www.cnblogs.com/gocode/p/fully-interpretate-executor-framework.html

时间: 2024-10-10 01:32:04

Executor框架完整解读的相关文章

Java并发编程系列之十五:Executor框架

Java使用线程完成异步任务是很普遍的事,而线程的创建与销毁需要一定的开销,如果每个任务都需要创建一个线程将会消耗大量的计算资源,JDK 5之后把工作单元和执行机制区分开了,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供.Executor框架为线程的启动.执行和关闭提供了便利,底层使用线程池实现.使用Executor框架管理线程的好处在于简化管理.提高效率,还能避免this逃逸问题--是指不完整的对象被线程调用. Executor框架使用了两级调度模型进行

戏(细)说Executor框架线程池任务执行全过程(上)

一.前言 1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦.要执行任务的人只需把Task描述清楚,然后提交即可.这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了.具体点讲,提交一个Callable对象给ExecutorService(如最常用的线程池ThreadPoolExecutor),将得到一个Future对象,调用Future对象的get方法等待执行结果就好了. 经过这样的封装,对于使用者来说,提交任务获取结果的过程大大简化,调用者直接从提交

戏(细)说Executor框架线程池任务执行全过程(下)

上一篇文章中通过引入的一个例子介绍了在Executor框架下,提交一个任务的过程,这个过程就像我们老大的老大要找个老大来执行一个任务那样简单.并通过剖析ExecutorService的一种经典实现ThreadPoolExecutor来分析接收任务的主要逻辑,发现ThreadPoolExecutor的工作思路和我们带项目的老大的工作思路完全一致.在本文中我们将继续后面的步骤,着重描述下任务执行的过程和任务执行结果获取的过程.会很容易发现,这个过程我们更加熟悉,因为正是每天我们工作的过程.除了Thr

Java 并发之 Executor 框架

前言 Executor 框架概览 Executor ExecutorService ScheduledExecutorService ThreadPoolExecutor ScheduledThreadPoolExecutor Executors 结语 前言 在学习 JUC 的过程中我发现,JUC 这个包下面的文档写的十分的好,清楚又易于理解,这篇博客便是参考 JUC 中和 Executor 框架相关的一些类文档汇总出来的. 当然了,Executor 框架涉及到的类还是不少的,全部汇总的话时间成

Java Executor 框架

Java Executor 框架 Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor.Executors. ExecutorService.CompletionService.Future.Callable等.(图片引用自 http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html) 本篇博文分析Executor中几个比较重要的接口和类. Executor 1 public int

JAVA 1.5 并发之 Executor框架 (二)execute VS submit

http://www.cnblogs.com/rockman12352/p/3788688.html 上一篇对于整体框架讲了很多东西,但是具体在使用时有一些细节并没有说出来 首先是执行任务 execute(); 执行任务,返回空,相当于 new Thread(task).start(); submit();   执行任务,但是会返回一个future<T>,就是计算好的结果,如果没有计算好则会阻塞,还有一个好处是可以管理exception public static void main(Stri

Java并发编程-Executor框架集

Executor框架集对线程调度进行了封装,将任务提交和任务执行解耦. 它提供了线程生命周期调度的所有方法,大大简化了线程调度和同步的门槛. Executor框架集的核心类图如下: 从上往下,可以很清晰的看出框架集的各个类,以及它们之间的关系:Executor,是一个可以提交可执行(Runnable)任务的Object,这个接口解耦了任务提交和执行细节(线程使用.调度等),Executor主要用来替代显示的创建和运行线程:ExecutorService提供了异步的管理一个或多个线程终止.执行过程

Java并发和多线程(二)Executor框架

Executor框架 1.Task?Thread? 很多人在学习多线程这部分知识的时候,容易搞混两个概念:任务(task)和线程(thread). 并发编程可以使我们的程序可以划分为多个分离的.独立运行的任务.而这些任务具体得由线程来驱动.Java中,Thread类自身不执行任何操作,它只是驱动赋予它的任务,任务由Runnable接口提供. 2.executor Executor是个简单的接口,但它却提供了一种标准的方法将任务的提交过程与任务的执行过程解耦开来,从而无须太大困难就可以为某种类型的

Java并发(基础知识)—— Executor框架及线程池

在Java并发(基础知识)—— 创建.运行以及停止一个线程中讲解了两种创建线程的方式:直接继承Thread类以及实现Runnable接口并赋给Thread,这两种创建线程的方式在线程比较少的时候是没有问题的,但是当需要创建大量线程时就会出现问题,因为这种使用方法把线程创建语句随意地散落在代码中,无法统一管理线程,我们将无法管理创建线程的数量,而过量的线程创建将直接使系统崩溃. 从高内聚角度讲,我们应该创建一个统一的创建以及运行接口,为我们管理这些线程,这个统一的创建与运行接口就是JDK 5的Ex