Java并发(四)线程池监控

目录

  一、线程池监控参数

  二、线程池监控类

  三、注意事项

在上一篇博文中,我们介绍了线程池的基本原理和使用方法。了解了基本概念之后,我们可以使用 Executors 类创建线程池来执行大量的任务,使用线程池的并发特性提高系统的吞吐量。但是,线程池使用不当也会使服务器资源枯竭,导致异常情况的发生,比如固定线程池的阻塞队列任务数量过多、缓存线程池创建的线程过多导致内存溢出、系统假死等问题。因此,我们需要一种简单的监控方案来监控线程池的使用情况,比如完成任务数量、未完成任务数量、线程大小等信息。

一、线程池监控参数

上一篇博文提到,线程池提供了以下几个方法可以监控线程池的使用情况:

方法 含义
getActiveCount() 线程池中正在执行任务的线程数量
getCompletedTaskCount() 线程池已完成的任务数量,该值小于等于taskCount
getCorePoolSize() 线程池的核心线程数量
getLargestPoolSize() 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
getMaximumPoolSize() 线程池的最大线程数量
getPoolSize() 线程池当前的线程数量
getTaskCount() 线程池已经执行的和未执行的任务总数

通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法, afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。

二、线程池监控类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 继承ThreadPoolExecutor类,覆盖了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute()
 * 方法来统计线程池的执行情况
 * <p>
 * Created by on 2019/4/19.
 */
public class ThreadPoolMonitor extends ThreadPoolExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);

    /**
     * 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
     */
    private ConcurrentHashMap<String, Date> startTimes;

    /**
     * 线程池名称,一般以业务名称命名,方便区分
     */
    private String poolName;

    /**
     * 调用父类的构造方法,并初始化HashMap和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param poolName        线程池名称
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }

    /**
     * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
     */
    @Override
    public void shutdown() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }

    /**
     * 线程池立即关闭时,统计线程池情况
     */
    @Override
    public List<Runnable> shutdownNow() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    /**
     * 任务执行之前,记录任务开始时间
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /**
     * 任务执行之后,计算任务结束时间
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
        // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
        // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
        LOGGER.info("{}-pool-monitor: " +
                        "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                        "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                this.poolName,
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

    /**
     * 创建固定线程池,代码源于Executors.newFixedThreadPool方法,这里增加了poolName
     *
     * @param nThreads 线程数量
     * @param poolName 线程池名称
     * @return ExecutorService对象
     */
    public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
        return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
    }

    /**
     * 创建缓存型线程池,代码源于Executors.newCachedThreadPool方法,这里增加了poolName
     *
     * @param poolName 线程池名称
     * @return ExecutorService对象
     */
    public static ExecutorService newCachedThreadPool(String poolName) {
        return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
    }

    /**
     * 生成线程池所用的线程,只是改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
     */
    static class EventThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /**
         * 初始化线程工厂
         *
         * @param poolName 线程池名称
         */
        EventThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}

ThreadPoolMonitor 类继承了 ThreadPoolExecutor 类,重写了shutdown() 、shutdownNow() 、beforeExecute() 和 afterExecute()方法来统计线程池的执行情况,这四个方法是 ThreadPoolExecutor 类预留给开发者进行扩展的方法,具体如下:

方法 含义
shutdown() 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计已执行任务、正在执行任务、未执行任务数量
shutdownNow() 线程池立即关闭时,统计已执行任务、正在执行任务、未执行任务数量
beforeExecute(Thread t, Runnable r) 任务执行之前,记录任务开始时间,startTimes这个HashMap以任务的hashCode为key,开始时间为值
afterExecute(Runnable r, Throwable t) 任务执行之后,计算任务结束时间。统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止信息

一般我们会依赖 beforeExecute 和 afterExecute 这两个方法统计的信息,具体原因请参考需要注意部分的最后一项。有了这些信息之后,我们可以根据业务情况和统计的线程池信息合理调整线程池大小,根据任务耗时长短对自身服务和依赖的其他服务进行调优,提高服务的可用性。

三、注意事项

1. 在 afterExecute 方法中需要注意,需要调用 ConcurrentHashMap 的 remove 方法移除并返回任务的开始时间信息,而不是调用 get 方法,因为在高并发情况下,线程池里要执行的任务很多,如果只获取值不移除的话,会使 ConcurrentHashMap 越来越大,引发内存泄漏或溢出问题。该行代码如下:

Date startDate = startTimes.remove(String.valueOf(r.hashCode()));

2. 有了ThreadPoolMonitor类之后,我们可以通过 newFixedThreadPool(int nThreads, String poolName) 和 newCachedThreadPool(String poolName) 方法创建两个日常我们使用最多的线程池,跟默认的 Executors 里的方法不同的是,这里需要传入 poolName 参数,该参数主要是用来给线程池定义一个与业务相关并有具体意义的线程池名字,方便我们排查线上问题。

3. 在生产环境中,谨慎调用 shutdown() 和 shutdownNow() 方法,因为调用这两个方法之后,线程池会被关闭,不再接收新的任务,如果有新任务提交到一个被关闭的线程池,会抛出 java.util.concurrent.RejectedExecutionException 异常。其实在使用Spring等框架来管理类的生命周期的条件下,也没有必要调用这两个方法来关闭线程池,线程池的生命周期完全由该线程池所属的Spring管理的类决定。

原文地址:https://www.cnblogs.com/warehouse/p/10732965.html

时间: 2024-08-10 19:17:22

Java并发(四)线程池监控的相关文章

Java并发编程——线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

java并发学习--线程池(一)

关于java中的线程池,我一开始觉得就是为了避免频繁的创建和销毁线程吧,先创建一定量的线程,然后再进行复用.但是要具体说一下如何做到的,自己又说不出一个一二三来了,这大概就是自己的学习习惯流于表面,不经常深入的结果吧.所以这里决定系统的学习一下线程池的相关知识. 自己稍微总结了一下,学习一些新的知识或者技术的时候,大概都可以分为这么几个点: 1.为什么会有这项技术,用原来的方法有什么问题. 2.这项新技术具体是怎么解决这个问题的(这时可能就要涉及到一些具体的知识点和编码了) 3.是不是使用这项技

Java并发编程——线程池

一.Java中的ThreadPoolExecutor类 二.深入剖析线程池实现原理 三.使用示例 四.如何合理配置线程池的大小 一.Java中的ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类.下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: public c

java并发:线程池、饱和策略、定制、扩展

一.序言 当我们需要使用线程的时候,我们可以随时新建一个线程,这样实现起来非常简便,但在某些场景下存在缺陷:如果需要同时执行多个任务(即并发的线程数量很多),频繁地创建线程会降低系统的效率,因为创建和销毁线程均需要一定的时间.线程池可以使线程得到复用,所谓线程复用就是线程在执行完一个任务后并不被销毁,该线程可以继续执行其他的任务. 二.Executors提供的线程池 Executors是线程的工厂类,也可以说是一个线程池工具类,Executors提供的线程都是通过参数设置来实现不同的线程池机制.

Java多线程(四) 线程池

一个优秀的软件不会随意的创建很销毁线程,因为创建和销毁线程需要耗费大量的CPU时间以及需要和内存做出大量的交互.因此JDK5提出了使用线程池,让程序员把更多的精力放在业务逻辑上面,弱化对线程的开闭管理. JDK提供了四种不同的线程池给程序员使用 首先使用线程池,需要用到ExecutorService接口,该接口有个抽象类AbstractExecutorService对其进行了实现,ThreadPoolExecutor进一步对抽象类进行了实现.最后JDK封装了一个Executor类对ThreadP

java并发与线程池应用

一.创建线程池 Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制.Executor基于生产者---消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者.如果要使用Executor,必须将任务表述为一个Runnable. (1) public static ExecutorService newFixedThreadPoo

JAVA 并发编程-线程池(七)

线程池的作用: 线程池作用就是限制系统中运行线程的数量. 依据系统的环境情况.能够自己主动或手动设置线程数量,达到运行的最佳效果:少了浪费了系统资源,多了造成系统拥挤效率不高.用线程池控制线程数量,其它线程排队等候.一个任务运行完毕,再从队列的中取最前面的任务開始运行. 为什么要用线程池: 1.降低了创建和销毁线程的次数,每一个工作线程都能够被反复利用.可运行多个任务. 2.能够依据系统的承受能力,调整线程池中工作线线程的数目,防止由于消耗过多的内存,而把server累趴下 Java里面线程池的

Java并发编程——线程池初步

概述: 线程池机制是事先创建一些线程等待服务端程序的调用,这些线程保存在一个数组结构中,称为"线程池".当服务器有任务执行时,就从线程池中取出一个线程并给其分配任务,当线程任务执行完成后,再被放回线程池中. 优点: 1. 由于在任务到达之前,线程已经存在,所以这里为系统消除了线程创建的资源和时间的开销.可以立即为请求服务,使程序响应更快. 2. 通过适当地调节线程池中的线程数目,就强制使一些新到的任务处于等待状态,可以防止资源不足. 参考: http://www.cnblogs.com

java并发编程-线程池的使用

参考文章:http://www.cnblogs.com/dolphin0520/p/3932921.html 深入剖析线程池实现原理 将从下面几个方面讲解: 1.线程池状态 2.任务的执行 3.线程池中的线程初始化 4.任务缓存队列及排队策略 5.任务拒绝策略 6.线程池的关闭 7.线程池容量的动态调整 1.线程池状态 在ThreadPoolExcutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态: volatile int runState