高并发之——从源码角度分析创建线程池究竟有哪些方式

前言

在Java的高并发领域,线程池一直是一个绕不开的话题。有些童鞋一直在使用线程池,但是,对于如何创建线程池仅仅停留在使用Executors工具类的方式,那么,创建线程池究竟存在哪几种方式呢?就让我们一起从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池。

使用Executors工具类创建线程池

在创建线程池时,初学者用的最多的就是Executors 这个工具类,而使用这个工具类创建线程池时非常简单的,不需要关注太多的线程池细节,只需要传入必要的参数即可。Executors 工具类提供了几种创建线程池的方法,如下所示。

  • Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程
  • Executors.newFixedThreadPool:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待
  • Executors.newScheduledThreadPool:创建一个定长的线程池,支持定时、周期性的任务执行
  • Executors.newSingleThreadExecutor: 创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指定顺序(先入先出或者优先级)执行
  • Executors.newSingleThreadScheduledExecutor:创建一个单线程化的线程池,支持定时、周期性的任务执行
  • Executors.newWorkStealingPool:创建一个具有并行级别的work-stealing线程池

其中,Executors.newWorkStealingPool方法是Java 8中新增的创建线程池的方法,它能够为线程池设置并行级别,具有更高的并发度和性能。除了此方法外,其他创建线程池的方法本质上调用的是ThreadPoolExecutor类的构造方法。

例如,我们可以使用如下代码创建线程池。

Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);

使用ThreadPoolExecutor类创建线程池

从代码结构上看ThreadPoolExecutor类继承自AbstractExecutorService,也就是说,ThreadPoolExecutor类具有AbstractExecutorService类的全部功能。

既然Executors工具类中创建线程池大部分调用的都是ThreadPoolExecutor类的构造方法,所以,我们也可以直接调用ThreadPoolExecutor类的构造方法来创建线程池,而不再使用Executors工具类。接下来,我们一起看下ThreadPoolExecutor类的构造方法。

ThreadPoolExecutor类中的所有构造方法如下所示。

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                 BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                    TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由ThreadPoolExecutor类的构造方法的源代码可知,创建线程池最终调用的构造方法如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
              long keepAliveTime, TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

关于此构造方法中各参数的含义和作用,如下所示。
注意:为了更加深入的分析ThreadPoolExecutor类的构造方法,会适当调整参数的顺序进行解析,以便于大家更能深入的理解ThreadPoolExecutor构造方法中每个参数的作用。

上述构造方法接收如下参数进行初始化:

(1)corePoolSize:核心线程数量。

(2)maximumPoolSize:最大线程数。

(3)workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响。

其中,上述三个参数的关系如下所示:

  • 如果运行的线程数小于corePoolSize,直接创建新线程处理任务,即使线程池中的其他线程是空闲的。
  • 如果运行的线程数大于等于corePoolSize,并且小于maximumPoolSize,此时,只有当workQueue满时,才会创建新的线程处理任务。
  • 如果设置的corePoolSize与maximumPoolSize相同,那么创建的线程池大小是固定的,此时,如果有新任务提交,并且workQueue没有满时,就把请求放入到workQueue中,等待空闲的线程,从workQueue中取出任务进行处理。
  • 如果运行的线程数量大于maximumPoolSize,同时,workQueue已经满了,会通过拒绝策略参数rejectHandler来指定处理策略。

根据上述三个参数的配置,线程池会对任务进行如下处理方式:

当提交一个新的任务到线程池时,线程池会根据当前线程池中正在运行的线程数量来决定该任务的处理方式。处理方式总共有三种:直接切换、使用无限队列、使用有界队列。

  • 直接切换常用的队列就是SynchronousQueue。
  • 使用无限队列就是使用基于链表的队列,比如:LinkedBlockingQueue,如果使用这种方式,线程池中创建的最大线程数就是corePoolSize,此时maximumPoolSize不会起作用。当线程池中所有的核心线程都是运行状态时,提交新任务,就会放入等待队列中。
  • 使用有界队列使用的是ArrayBlockingQueue,使用这种方式可以将线程池的最大线程数量限制为maximumPoolSize,可以降低资源的消耗。但是,这种方式使得线程池对线程的调度更困难,因为线程池和队列的容量都是有限的了。

根据上面三个参数,我们可以简单得出如何降低系统资源消耗的一些措施:

  • 如果想降低系统资源的消耗,包括CPU使用率,操作系统资源的消耗,上下文环境切换的开销等,可以设置一个较大的队列容量和较小的线程池容量。这样,会降低线程处理任务的吞吐量。
  • 如果提交的任务经常发生阻塞,可以考虑调用设置最大线程数的方法,重新设置线程池最大线程数。如果队列的容量设置的较小,通常需要将线程池的容量设置的大一些,这样,CPU的使用率会高些。如果线程池的容量设置的过大,并发量就会增加,则需要考虑线程调度的问题,反而可能会降低处理任务的吞吐量。

接下来,我们继续看ThreadPoolExecutor的构造方法的参数。

(4)keepAliveTime:线程没有任务执行时最多保持多久时间终止
当线程池中的线程数量大于corePoolSize时,如果此时没有新的任务提交,核心线程外的线程不会立即销毁,需要等待,直到等待的时间超过了keepAliveTime就会终止。

(5)unit:keepAliveTime的时间单位

(6)threadFactory:线程工厂,用来创建线程
默认会提供一个默认的工厂来创建线程,当使用默认的工厂来创建线程时,会使新创建的线程具有相同的优先级,并且是非守护的线程,同时也设置了线程的名称

(7)rejectHandler:拒绝处理任务时的策略

如果workQueue阻塞队列满了,并且没有空闲的线程池,此时,继续提交任务,需要采取一种策略来处理这个任务。
线程池总共提供了四种策略:

  • 直接抛出异常,这也是默认的策略。实现类为AbortPolicy。
  • 用调用者所在的线程来执行任务。实现类为CallerRunsPolicy。
  • 丢弃队列中最靠前的任务并执行当前任务。实现类为DiscardOldestPolicy。
  • 直接丢弃当前任务。实现类为DiscardPolicy。

大家可以自行调用ThreadPoolExecutor类的构造方法来创建线程池。例如,我们可以使用如下形式创建线程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

使用ForkJoinPool类创建线程池

在Java8的Executors工具类中,新增了如下创建线程池的方式。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

从源代码可以可以,本质上调用的是ForkJoinPool类的构造方法类创建线程池,而从代码结构上来看ForkJoinPool类继承自AbstractExecutorService抽象类。接下来,我们看下ForkJoinPool类的构造方法。

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism,
                ForkJoinWorkerThreadFactory factory,
                UncaughtExceptionHandler handler,
                boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通过查看源代码得知,ForkJoinPool的构造方法,最终调用的是如下私有构造方法。

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

其中,各参数的含义如下所示。

  • parallelism:并发级别。
  • factory:创建线程的工厂类对象。
  • handler:当线程池中的线程抛出未捕获的异常时,统一使用UncaughtExceptionHandler对象处理。
  • mode:取值为FIFO_QUEUE或者LIFO_QUEUE。
  • workerNamePrefix:执行任务的线程名称的前缀。

当然,私有构造方法虽然是参数最多的一个方法,但是其不会直接对外方法,我们可以使用如下方式创建线程池。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

使用ScheduledThreadPoolExecutor类创建线程池

在Executors工具类中存在如下方法类创建线程池。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

从源码来看,这几个方法本质上调用的都是ScheduledThreadPoolExecutor类的构造方法,ScheduledThreadPoolExecutor中存在的构造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

而从代码结构上看,ScheduledThreadPoolExecutor类继承自ThreadPoolExecutor类,本质上还是调用ThreadPoolExecutor类的构造方法,只不过此时传递的队列为DelayedWorkQueue。我们可以直接调用ScheduledThreadPoolExecutor类的构造方法来创建线程池,例如以如下形式创建线程池。

new ScheduledThreadPoolExecutor(3)

最后,需要注意的是:ScheduledThreadPoolExecutor主要用来创建执行定时任务的线程池。

原文地址:https://www.cnblogs.com/binghe001/p/12357409.html

时间: 2024-10-13 01:17:59

高并发之——从源码角度分析创建线程池究竟有哪些方式的相关文章

从源码角度分析linearLayout测量过程以及weight机制

???上文从源码角度分析了view和viewGroup的measure机制,如果还没有阅读的同志们,可以前往从源码角度分析Android View的绘制机制(一)阅读.下面我再结合linearLayout的measure过程解释以下两个问题的缘由. 问题一: <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" android:layout_width="match_parent

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe

从源码角度分析Android View的绘制机制(一)

在Android的学习道路上,每一个人员都免不了去翻阅Android的源码,因为只有从源码的角度分析问题,我们才能真正的玩转Android开发.最近由于工作比较闲,总想着想写点什么东西,正好自己也可以整理一下.考虑到view的显示机制是自定义view的基础,也是面试中经常被问到的问题,所以记录此文,和大家共享,因水平有限,望大家踊跃拍砖,不胜感激. 有过自定义view的同行们都应该知道,view的显示依托于activity的setContentView方法依附到PhoneWindow窗体上的,在

从源码角度分析Android中的Binder机制的前因后果

前面我也讲述过一篇文章<带你从零学习linux下的socket编程>,主要是从进程通信的角度开篇然后延伸到linux中的socket的开发.本篇文章依然是从进程通信的角度去分析下Android中的进程通信机制. 为什么在Android中使用binder通信机制? 众所周知linux中的进程通信有很多种方式,比如说管道.消息队列.socket机制等.socket我们再熟悉不过了,然而其作为一款通用的接口,通信开销大,数据传输效率低,主要用在跨网络间的进程间通信以及在本地的低速通信.消息队列和管道

从源码角度分析ViewStub 疑问与原理

一.提出疑问 ViewStub比较简单,之前文章都提及到<Android 性能优化 三 布局优化ViewStub标签的使用>,但是在使用过程中有一个疑惑,到底是ViewStub上设置的参数有效还是在其包括的layout中设置参数有效?如果不明白描述的问题,可以看下以下布局伪代码. res/layout/main.xml <LinearLayout > <ViewStub android:id="@+id/viewstub" android:layout_w