Java 并发之 Executor 框架

  • 前言
  • Executor 框架概览
  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • Executors
  • 结语

前言

在学习 JUC 的过程中我发现,JUC 这个包下面的文档写的十分的好,清楚又易于理解,这篇博客便是参考 JUC 中和 Executor 框架相关的一些类文档汇总出来的。

当然了,Executor 框架涉及到的类还是不少的,全部汇总的话时间成本太高,有点亏,所以这里主要就集中在了 Executor 接口及其子接口和具体实现上。

Executor 框架概览

Executor 框架的起点自然就是 Executor 接口,可以说整个 Executor 框架便是建立在 Executor 接口和其子接口上的,大致结构为:

其中,接口 Executor 表示定制的 thread-like 子系统,包括线程池、异步 I/O 和轻量级的任务框架。同时,依赖于具体的 Executor 实现, 新的任务可能在新线程、已存在的任务执行线程或调用 Executor.execute 方法的线程中执行,不同任务的执行可能是依序执行的或并行执行的。

Executor 的子接口 ExecutorService 提供了更完整的异步任务执行支持,每个 ExecutorService 会对任务的调度进行管理,并且允许受控的 取消 操作。 它的子接口 ScheduledExecutorService 还添加了对延迟和定期任务执行的支持。

由于 Runnable 接口定义的 run 方法是没有返回值的,因此,ExecutorService 还支持通过 Callable 来定义一些异步任务,这些异步任务可以有一个执行结果, 我们可以通过相应的 Future 来获取这个结果。

提交到 ExecutorService 的异步任务通常都会返回一个对应的 Future 对象,我们可以通过这个对象来获取异步任务的执行状态和执行结果,或者取消该任务的执行。

对于上述的接口来说,Executor 框架提供了一些默认的实现,很多时候,这些实现已经足够我们的使用:

  • Executor 框架提供了两个灵活的 可配置 的线程池实现 ThreadPoolExecutor(ExecutorService)ScheduledThreadPoolExecutor(ScheduledExecutorService)
  • 可以通过 Executors 的工厂方法来创建指定配置的线程池,同时通过一些其他实用的方法来使用它们

另外,类 ForkJoinPool 也提供了一个 Executor 来处理 ForkJoinTask 及其子类的实例。

参考:

Executor

一般来说,我们可以手动创建 Thread 对象来执行 Runnable 任务,但是,在有了 Executor 框架后,更好的选择是将这些异步任务转交给 Executor 的具体实现来执行。

比如说将 Thread(new(RunnableTask())).start() 替换为:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

当然,我们需要明白的是,不同的 Executor 实现是不一样的,我们提交的异步任务不一定就在别的线程执行,比如下面这样的实现:

class DirectExecutor implements Executor {
  public void execute(Runnable r) {
    r.run();
  }
}

但是,Executor 这个接口定义的功能很有限,同时也只支持 Runnale 形式的异步任务:

void execute(Runnable command);

参考:

ExecutorService

ExecutorService 为异步任务的执行提供了更多的支持,包括用于 终止 的方法以及可以产生用于跟踪一个或多个异步任务进度的 Future 的方法。

首先,和 Executor 不一样的是,ExecutorService 是可以终止的,当 ExecutorService 终止后,便不会接受新提交的任务。可以通过两个方法来终止 ExecutorService:

  • 通过 shutdown 方法来终止 ExecutorService,允许在执行完先前提交的任务后在终止 ExecutorService
  • 通过 shutdownNow 方法来终止 ExecutorService,会阻止等待的任务启动并尝试停止当前正在执行的任务

ExecutorService 终止后,就表示 ExecutorService 不存在正在或等待执行的任务,同时,会拒绝新任务的提交,通常应该关闭未使用的 ExecutorService 以便回收资源。

然后,和 Executor.execute 方法不一样,在 ExecutorService 中可以通过 ExecutorService.submit 方法来提交任务,这个方法会返回与提交的任务相关联的 Future 对象, 我们可以通过这个 Future 对象来等待/取消任务的执行,并获取执行结果。还可以通过 invokeAnyinvokeAll 来提交一组任务,等待其中一个或所有任务的执行。

同时,相较于只支持 Runnable 的 Executor,ExecutorService 还支持 Callable 形式的异步任务:

submit(Callable<T> task);
submit(Runnable task);
submit(Runnable task, T result);

参考:

ScheduledExecutorService

接口 ScheduledExecutorService 相较于 ExecutorService 来说添加了对延迟和定期任务执行的支持,还是比较好理解的:

// 单次延迟任务
schedule(Callable<V> callable, long delay, TimeUnit unit)
schedule(Runnable command, long delay, TimeUnit unit)
// 循环延迟任务
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

参考:

ThreadPoolExecutor

ThreadPoolExecutorExecutorService 的一种具体实现,一般情况下我们可以通过 Executors 来创建新的线程池,但是,了解 ThreadPoolExecutor 提供的各配置项还是很有用的, 而 ThreadPoolExecutor 文档中对这些配置项给出了很详细的描述。

Core and maximum pool sizes - 线程池核心线程数和最大线程数,线程池根据这两个参数来自动调整线程池的大小:

  1. 在提交新任务且运行的线程数少于 corePoolSize 时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理新任务
  2. 在运行的线程数大于 corePoolSize 但小于 maximumPoolSize 时,就仅在 队列已满 时才创建新线程
  3. 这两个参数可以在创建线程池时设置,也可以在创建后动态修改

On-demand construction - ThreadPoolExecutor 默认情况下是在新任务提交后在创建启动线程,但是可以通过覆盖 prestartCoreThread()prestartAllCoreThreads() 改变这一行为, 这在初始队列不为空时会很有用。

Creating new threads - ThreadPoolExecutor 通过 ThreadFactory 来创建新的线程,默认情况下会使用 Executors.defaultThreadFactory(), 这个 ThreadFactory 创建的所有线程拥有相同的 ThreadGroupNORM_PRIORITY 级别的优先级, 同时也是非守护线程。

Keep-alive times - 当当前线程池中的线程数超过 corePoolSize 时,多余的线程将在闲置时间超过 keepAliveTime 时终止。默认情况下参数 keepAliveTime 仅在线程数超过 corePoolSize 时起作用, 但是也可以通过 allowCoreThreadTimeOut(boolean) 方法让核心线程在闲置一段时间后也被终止。

Queuing - 任意的 BlockingQueue 都可以用于传输和保留提交的任务,队列的使用和当前线程池的大小相关:

  • 在当前线程池中的线程数小于 corePoolSize 时,优先创建新的线程
  • 在当前线程池中的线程数超过 corePoolSize 时,优先选择将任务放入队列
  • 在新的任务 无法放入队列 且线程数小于 maximumPoolSize 时,会创建新的线程,否则会拒绝新的任务

线程池中一般可以选择下面三种队列使用策略:

  • 直接交付,比如说使用 SynchronousQueue 队列,直接将任务传递给工作线程,如果没有合适的工作线程来处理任务,那么就会选择创建新的线程获拒绝任务,这时一般会将 maximumPoolSize 设的大一点
  • 无界队列,比如说使用 LinkedBlockingQueue 队列,这种情况下因为新的任务必然可以放入队列,因此,参数 maximumPoolSize 便失去了意义,此时最多只会有 corePoolSize 个线程在运行
  • 有界队列,比如说使用 ArrayBlockingQueue 队列,这时我们可以通过灵活调整 corePoolSize, maximumPoolSize 和队列大小来更加充分的利用线程池

Rejected tasks - 当 ExecutorService 被关闭或者任务无法放入队列且线程数量超过 maximumPoolSize 时,新任务的提交会被拒绝,这时便会调用 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 来处理被拒绝的任务, 可选的处理策略有:

  1. ThreadPoolExecutor.AbortPolicy(default) - 抛出运行时异常 RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy - 在调用 executor 的 线程执行该任务
  3. ThreadPoolExecutor.DiscardPolicy - 直接删除忽略新的任务
  4. ThreadPoolExecutor.DiscardOldestPolicy - 如果 ExecutorService 没有被关闭,那么就丢弃队列头的任务重新提交这个任务

Hook methods - 方法 beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable) 会在每个任务执行的前后调用,也可以覆盖 terminated() 方法在 Executor 终止 执行一些额外的操作。

Queue maintenance - 可以通过 getQueue() 方法访问工作队列,以进行监视和调试。强烈建议不要将此方法用于任何其他目的,可以通过 remove(Runnable)purge() 清理队列中的任务。

Finalization - 在线程池不在被引用 没有剩余工作线程时,线程池将会被关闭。可以考虑将 corePoolSize 设小并通过 allowCoreThreadTimeOut(boolean) 保证核心线程闲置久了也会被回收, 那么,忘记调用 shutdown 也不要担心资源的浪费了。

这么多的配置项,如此强大的功能,我只想说,Doug Lea NB(破音)!!!

附,文档上的一个例子:

// 可以暂停的线程池
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
  private boolean isPaused;
  private ReentrantLock pauseLock = new ReentrantLock();
  private Condition unpaused = pauseLock.newCondition();

  public PausableThreadPoolExecutor(...) { super(...); }

  protected void beforeExecute(Thread t, Runnable r) {
    super.beforeExecute(t, r);
    pauseLock.lock();
    try {
      while (isPaused) unpaused.await();
    } catch (InterruptedException ie) {
      t.interrupt();
    } finally {
      pauseLock.unlock();
    }
  }

  public void pause() {
    pauseLock.lock();
    try {
      isPaused = true;
    } finally {
      pauseLock.unlock();
    }
  }

  public void resume() {
    pauseLock.lock();
    try {
      isPaused = false;
      unpaused.signalAll();
    } finally {
      pauseLock.unlock();
    }
  }
}

参考:

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService 接口,是比 Timer 更好的选择。它使用指定大小的 corePoolSize 和无界队列, 因此,参数 maximumPoolSize 没有意义。

ScheduledThreadPoolExecutor 能够保证任务的执行时间 不早于 指定的时间,但是不能保证一定在那个时间执行。对于指定在同一时间执行的任务,将会按照 FIFO 的规则执行。

另外,假如在开始执行前任务就已经被取消了,那么 ScheduledThreadPoolExecutor 就不会在执行那个任务,但是默认不会从队列中取出该任务。 但是可以通过 setRemoveOnCancelPolicy(boolean) 要求 ScheduledThreadPoolExecutor 在任务取消时就立即从队列中取出该任务。

参考:

Executors

Executors 提供了一些针对 Executor, ExecutorService, ScheduledExecutorServiceThreadFactory 的实用方法,文档上的话就没有什么好说的了,其实就是一个熟悉接口的问题。

参考:

结语

通过文档的阅读对 Executors 有了大致的了解,但是,如果你打开源码就会发现,除了类上面的 Javadoc 以外,在类内部,通常还会有很长一段注释告诉你这个东西它是怎么实现的!!!

这对于阅读源码的人来说简直太友好了,多的不说,至少思路告诉你了,在看源码就容易多了 ?(`?ω?′)

原文地址:https://www.cnblogs.com/rgbit/p/12350112.html

时间: 2024-10-02 14:21:31

Java 并发之 Executor 框架的相关文章

JAVA 1.5 并发之 Executor框架 (二)execute VS submit

http://www.cnblogs.com/rockman12352/p/3788688.html 上一篇对于整体框架讲了很多东西,但是具体在使用时有一些细节并没有说出来 首先是执行任务 execute(); 执行任务,返回空,相当于 new Thread(task).start(); submit();   执行任务,但是会返回一个future<T>,就是计算好的结果,如果没有计算好则会阻塞,还有一个好处是可以管理exception public static void main(Stri

【Java 并发】Executor框架机制与线程池配置使用

[Java 并发]Executor框架机制与线程池配置使用 一,Executor框架Executor框架便是Java 5中引入的,其内部使用了线程池机制,在java.util.cocurrent 包下,通过该框架来控制线程的启动.执行和关闭,可以简化并发编程的操作.因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,更易管理,效率更好(用线程池实现,节约开销). Executor框架主要包括:Executor,Executors,ExecutorSer

Java开发之Mybatis框架

mybasits配置文件书写1.configer文件配置<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE configurationPUBLIC "-//mybatis.org//DTD Config 3.0//EN""http://mybatis.org/dtd/mybatis-3-config.dtd"><configuration><

Java并发编程-Executor框架之Callable和Future接口

在上一篇文章中我们已经了解了Executor框架进行线程管理,这篇文章将学习Executor框架的另一个特性,我们知道执行Runnable任务是没有返回值得,但Executor可以运行并发任务并获得返回值,Concurrent包提供下面两个接口实现这个功能: Callable接口:这个接口声明call(),类似于Runnable的run(),可以在这个方法里实现任务的具体逻辑操作.Callable是一个泛型接口,必须声明call()的返回类型. Future接口:这个接口声明了一下方法来获取Ca

Java并发编程-Executor框架(转)

本文转自http://blog.csdn.net/chenchaofuck1/article/details/51606224 感谢作者 我们在传统多线程编程创建线程时,常常是创建一些Runnable对象,然后创建对应的Thread对象执行它们,但是如果程序需要并发执行大量的任务时,需要为每个任务都创建一个Thread,进行管理,这将会影响程序的执行效率,并且创建线程过多将会使系统负载过重. 在JDK 1.5之后通过了一套Executor框架能够解决这些问题,能够分解任务的创建和执行过程.该框架

【Java并发】Executor框架

Executor框架简介 Java的线程既是工作单元,也是执行机制.从JDK5开始,把工作单元和执行机制分离开来. Executor框架由3大部分组成 任务. 被执行任务需要实现的接口:Runnable接口或Callable接口 异步计算的结果.Future接口和FutureTask类. 任务的执行.两个关键类ThreadPoolExecutor和SeheduledThreadPoolExecutor. 任务 Runnable接口 不含有运行结果 public interface Runnabl

Java线程池Executor框架详解

Java的线程既是工作单元,也是执行机制.从JDK 5开始,把工作单元与执行机制分离开来.工作单元包括Runnable和Callable,而执行机制由Executor框架提供. Executor框架简介在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程.Java线程启动时会创建一个本地操作系统线程:当该Java线程终止时,这个操作系统线程也会被回收.操作系统会调度所有线程并将它们分配给可用的CPU.在上层,Java多线程程序通常把应

Java线程池 Executor框架概述

线程池的意义 循环利用线程资源,避免重复创建和销毁线程 线程池的任务是异步执行的,只要提交完成就能快速返回,可以提高应用响应性 Java线程池还有一个很重要的意义:Java线程池就是JDK 5 推出的Executor框架,在此之前Java线程既是工作任务又是执行机制,而Executor框架把工作任务与执行机制分离开来:工作任务包括Runnable接口和Callable接口,而执行机制由Executor接口提供. Executor 类继承体系 Executor框架由三个部分组成 工作任务:Runn

Java并发编程-Executor框架集

Executor框架集对线程调度进行了封装,将任务提交和任务执行解耦. 它提供了线程生命周期调度的所有方法,大大简化了线程调度和同步的门槛. Executor框架集的核心类图如下: 从上往下,可以很清晰的看出框架集的各个类,以及它们之间的关系:Executor,是一个可以提交可执行(Runnable)任务的Object,这个接口解耦了任务提交和执行细节(线程使用.调度等),Executor主要用来替代显示的创建和运行线程:ExecutorService提供了异步的管理一个或多个线程终止.执行过程