核心线程池的内部实现(读书笔记)

对于核心的几个线程池,无论是newFixedThreadPool()方法,newSingleThreadExecutor()还是newCachedThreadPool()方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了ThreadPoolExecutor实现,下面给出了三个线程池的实现方式.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

由以上线程池的实现代码可以看到,他们都是ThreadPoolExecutor类的封装. 让我们看一下ThreadPoolExecutor最重要的构造器:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
 

函数的参数如下:

  • corePoolSize 指定线程池中的线程数量
  • maximumPoolSize 指定了线程池中的最大线程数量
  • keepAliveTime 当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,
  • uintkeepAliveTime的单位
  • workQueue 队伍队列,被提交但尚未被执行的任务.
  • threadFactory:线程工厂 用于创建线程,一般用默认即可
  • handler 拒绝策略 当任务太多来不及处理,如何拒绝任务

以上参数中,大多数都很简单,只有workQueue和handler需要进行详细说明.

参数workQueue指被提交但未执行的任务队列,他是一个BlockingQueue接口的对象,用于存放Runable对象,根据队列功能分类,子ThreadPoolExecutor的构造函数中使用一下几种BlockIngQueue.

  • 直接提交的队列,改功能由synchronousQueue对象提供,SynchronousQueue是一个特殊的BlockingQueue.这个队列没有容量,每一个插入操作都要等待一个响应的删除操作,反之,每一个删除操作都要等待对应的插入操作,如果使用SynchronousQueue,提交的任务不会真实的保存,而总是将新任务提交给线程执行, 如果没有空闲的进程,则尝试创建新的进程,如果进程数量已经达到最大值,则执行拒绝策略,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略.
  • 有界的任务队列,有界的任务队列可以使用ArrayBlockingQueue实现,ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如写所示:

public ArrayBlockingQueue(int capacity)

当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的新线程,若大于corePoolSize,则会将新任务加入等待队列,若等待队列已经满,无法加入,则在总线程不大于maximumPoolSize的前提下,创建新的线程执行任务,若大于maximumPoolSize,则执行拒绝策略,可见,有界队列金当在任务队列装满时,才可能将线程数量提升到corePoolSize以上,换言之,除非系统非常繁忙.否则确保核心线程维持在corePoolSize.

  • 无界的任务队列:无界的任务队列可以通过LinkedBlockingQueue类实现,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况,当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就并不会继续增加,若后续仍有席你的任务加入,而又没有空闲的线程资源,责任务直接进入对列等待,若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存.
  • 优先任务队列:优先任务队列是带有执行优先级的队列,它通过PriorityBlockingQueue实现,可以控制任务的执行顺序,他是一个特殊的无界队列,无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出的算法处理任务的,而PriorityBlockingQueue则可以根据自身的优先级顺序先后执行,确保系统性能的同时,也能有很好的质量保证.

回顾newFixedThreadPool()的方法实现,

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

我们发现它用了corePoolSize和maximumPoolSize大小一样,并且使用了LingkedBlockingQueue任务队列的线程池.因为固定大小的线程池而言,不存在线程数量的动态变化,同时它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,改队列可能迅速膨胀.从而耗尽系统性能.

newSingleThreadExecutor()返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1

newCachedThreadPool()方法的实现:

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
 

这就意味着无任务时,线程池内无线程,而当任务提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列是一种直接提交的队列,他总会迫使线程池增加新的线程执行任务,.当任务执行完毕后,由于corePoolSize为0 因此空闲线程又会在指定的60s内回收.

对于这个线程池,如果同时有大量任务被提交,而任务的执行又不那么快,那么系统便会开启等量的线程处理,这样做法可能会很快耗尽系统的资源,

这里我们看一看ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn‘t, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
 

workerCountOf()方法取得了当前线程池的线程总数,当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行,否则则在workQueue.offer()进入等待队列,如果进入等待队列失败,则会执行将任务直接提交给线程池,如果当期已经达到maximumPoolSize,则提交失败,执行拒绝策略.

  • 超负载了怎么办:拒绝策略

ThreadPoolExecutor的最后一个参数制定了拒绝策略,也就是当任务数量超过系统实际承载能力时,该如何处理呢?这时候就要用到拒绝策略了,,

JDK内置提供了四种拒绝策略.

  • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy策略,只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务.显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降.
  • DiscardOledestPolicy策略: 改策略将丢弃最古老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.
  • DiscardPolicy策略,该策略默默的丢弃无法处理的任务,不与任何处理,如果允许人物丢弃,我觉得这可能是最好的一种方案了吧!

以上内置策略均实现了RejectedExecutionHandler接口 若以上策略仍无法满足实际应用需要,完全可以自己拓展RejectedExecutionHandler接口 定义如下:

/**
 * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

其中r为请求执行的任务,executor为当前线程池.

我们简单的自定义线程池和拒绝策略的使用:

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object‘s
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see Thread#run()
         */
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) throws InterruptedException {
            MyTask task = new MyTask();
            ExecutorService es = new ThreadPoolExecutor(5, 5,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(10),
                    Executors.defaultThreadFactory(),
                    (r, executor) -> System.out.println(r.toString() + " is discard"));
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                es.submit(task);
                Thread.sleep(10);
            }
        }
    }
}

上诉代码我们自定义了一个线程池,该池子有5个常驻线程,并且最大的线程数也是5个,这和固定大小的线程池是一样的,但是他却拥有一个只有10个容量的等待队列,因为使用无界队列很可能不是最佳解决方案,如果任务量极大,很可能会吧内存呈爆,给一个合理的队列大小,也合乎常理的选择,同时,这里定义了拒绝策略,.我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃,这极有可能不是我们希望遇到的,但作为必要的信息记录,我们将任务丢弃的信息进行打印.当然这是比内置的DiscardPolicy策略高级那么一点点,

由于上述代码中,MyTask执行需要花费100毫秒,因此 必然导致大量的任务被直接丢弃,输入如下:

在实际的应用中,我们可以将更详细的信息记录到日志中,来分析系统的负载和任务丢失的情况

时间: 2025-01-04 09:33:57

核心线程池的内部实现(读书笔记)的相关文章

《大型网站技术架构 -核心原理与安全分析》读书笔记

大型网站架构演化的价值观 网站的价值在于它能为用户提供什么价值,在于网站能做什么,而不在于它是怎么做的,所以在网站还很小的时候去追求网站的架构是舍本逐末,得不偿失的.小型网站最需要做的就是为用户提供好的服务来创造价值,得到用户的认可,活下去,野蛮生长. 网站架构设计误区 一味追求大公司的解决方案 大公司的经验和成功模式固然重要,值得学习借鉴,但如果因此而变得盲从,就失去了坚持自我的勇气,在架构演化的道路上迟早会迷路. 为了技术而技术 网站技术是为业务而存在的,除此毫无意义.在技术选型和架构设计中

【大型网站技术架构 核心原理与案例分析】读书笔记

章节 笔记 1.概述 网站架构模式:分层.分割.分布式.集群.缓存.异步.冗余.自动化.安全. 核心架构要素:性能.可用性.伸缩性.扩展性.安全. 4.高性能 一般重复请求一万次计算总响应时间然后除以一万得到单词响应时间. 测试程序并不是启动多线程然后不停发送请求,而是在两次请求之间加入一个随机等待时间. 吞吐量:每天通过收费站的车辆数目:并发数:正在行驶的车辆数目:响应时间:车速.TPS:每秒事务数:HPS:每秒请求数:QPS:每秒查询数. 性能计数器:System Load(系统负载,最理想

线程阻塞工具类:LockSupport(读书笔记)

他可以在线程任意位置让线程阻塞, LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos() ParkUntil()等,他们实现了一个限时等待 public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThrea

《大型网站技术架构:核心原理与案例分析》读书笔记

由于网站的访问流量是缓慢增长的(PS除了垄断的12306),所以一般网站的架构也是不断的演化的,没有一开始就搞出个支持大并发的网站.无论从开发到发布的时间.消耗的资源上来看,或者是说从开发.维护的难度上看,或者从开发的防止"过度设计"的维度思考,绝大多数网站设计是一个演化的过程.这也是植根于需求的表现. 分析目前大型互联网可以从两个维度,用户需求.结构框架.当然是前者决定后者.从用户需求特点分析,大型网站要求高可用.高性能两个最"简单"的要求.从设计的角度讲还要满足

《大型网站技术架构:核心原理与案例分析》-- 读书笔记 (5) :网购秒杀系统

1. 秒杀活动的技术挑战及应对策略 1.1 对现有网站业务造成冲击 秒杀活动具有时间短,并发访问量大的特点,必然会对现有业务造成冲击.对策:秒杀系统独立部署 1.2 高并发下的应用.数据库负载 用户会在秒杀开始前不停的刷新网页,访问量会很大.对策:秒杀商品页面静态化.这样就不需要经过服务器的业务逻辑处理,也不需要访问数据库 1.3 突然增加的网络及服务器带宽 如果一个页面大小200K,如果有10000个请求,那么需要的网络和服务器带宽大约是2G.对策:租借秒杀活动网络带宽(网络运营商及CDN服务

《大型网站技术架构——核心原理与案例分析》读书笔记(7)

第八章 网站的安全性架构 一.网站攻击与预防 1. XSS攻击 (1)攻击方法: 反射型:攻击者诱导用户点击一个包含恶意脚本的链接 持久型:黑客提交含有恶意脚本的请求,并保存到了数据库,用户浏览网页时恶意脚本被包含到正常的页面中 (2)防止手段: 请求参数消毒:对请求中的特殊字符转义 HttpOnly:对于敏感信息Cookie,可以设置HttpOnly来禁止js脚本获取Cookie. 2. 注入攻击 (1)攻击方法: SQL注入: OS注入: (2)防止手段: 避免泄露表结构: 请求参数消毒:

Java线程池--ThreadPoolExecutor

一.线程池的处理流程 向线程池提交一个任务后,它的主要处理流程如下图所示: 一个线程从被提交(submit)到执行共经历以下流程: 线程池判断核心线程池里的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务.如果核心线程池里的线程都在执行任务,则进入下一个流程; 线程池判断工作队列是否已满.如果工作队列没有满,则将新提交的任务储存在这个工作队列里.如果工作队列满了,则进入下一个流程; 线程池判断其内部线程是否都处于工作状态.如果没有,则创建一个新的工作线程来执行任务.如果已满了,则

Java线程池ThreadPoolExector的源码分析

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率.压榨CPU处理能力.多条线路同时运行等都是强有力的杀手锏工具.线程是如此的重要,那么我们来思考这样一个问题.假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行

线程复用:线程池

一.核心线程池内部实现 为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池.它的核心成员如图 以上成员均在java.util.concurrent包中,是JDK并发包的核心类.其中ThreadPoolExecutor表示一个线程池.Executors类则扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池. Executor框架提供了各种类型的线程池,主要有以下工厂方法: public static Ex