自定义 ThreadPoolExecutor 处理线程运行时异常

自定义 ThreadPoolExecutor 处理线程运行时异常

最近看完了ElasticSearch线程池模块的源码,感触颇深,然后也自不量力地借鉴ES的 EsThreadPoolExecutor 重新造了一把轮子,对线程池的理解又加深了一些。在继承 ThreadPoolExecutor实现自定义的线程池时,ES先重写了Runnable接口,提供了更灵活的任务运行过程中出现异常处理逻辑。简而言之,它采用回调机制实现了线程在运行过程中抛出未受检异常的统一处理逻辑,非常优美。实在忍不住把源码copy下来:

/**
 * An extension to runnable.
 */
public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  1. 统一的任务执行入口方法doRun(),由各个子类实现doRun()执行具体的业务逻辑
  2. try-catch中统一处理线程执行任务过程中抛出的异常,由onFailure()处理
  3. 任务执行完成(不管是正常结束还是运行过程中抛出了异常),统一由onAfter()处理
  4. 另外,还有一个isForceExecution方法,用来支持任务在提交给线程池被拒绝了,强制执行。当然了,这需要线程池的任务队列提供相关的支持。我也是受这种方式的启发,实现了一个线程在执行任务过程中抛出未受检异常时,先判断该任务是否允许强制执行isForceExecution,然后再重新提交任务运行的线程池

    ?

此外,ES内置了好几个默认实现的线程池,比如 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。

  1. QueueResizingEsThreadPoolExecutor

    在创建线程池时会指定一个任务队列(BlockingQueue),平常都是直接用 LinkedBlockingQueue,它是一个无界队列,当然也可以在构造方法中指定队列的长度。但是,ES中几乎不用 LinkedBlockingQueue 作为任务队列,而是使用 LinkedTransferQueue ,但是 LinkedTransferQueue 又是一个无界队列,于是ES又基于LinkedTransferQueue 封装了一个任务队列,类名称为 ResizableBlockingQueue,它能够限制任务队列的长度。

    那么问题来了,对于一个线程池,任务队列设置为多长合适呢?

    答案就是Little‘s Law。在QueueResizingEsThreadPoolExecutor 线程池中重写了afterExecute()方法,里面统计了每个任务的运行时间、等待时间(入队列到执行)。所以,你想知道如何统计一个任务的运行时间吗?你想统计线程池一共提交了多少个任务,所有任务的运行时间吗?看看QueueResizingEsThreadPoolExecutor 源码就明白了。

    另外再提一个问题,为什么ES用 LinkedTransferQueue 作为任务队列而不用 LinkedBlockingQueue 呢?

    我想:很重要的一个原因是LinkedBlockingQueue 是基于重量级的锁实现的入队操作,而LinkedTransferQueue 是基于CAS原子指令实现的入队操作。那么这就是synchronized内置锁和CAS原子指令之间的一些差异了,你懂得。

  2. PrioritizedEsThreadPoolExecutor

    优先级任务的线程池,任务提交给线程池后是在任务队列里面排队,FIFO模式。而这个线程池则允许任务定义一个优先级,优先级高的任务先执行。

  3. EsThreadPoolExecutor

    这个线程池最接近经常见到的ThreadPoolExecutor,不过,它实现了一些拒绝处理逻辑,提交任务若被拒绝(会抛出EsRejectedExecutionException异常),则进行相关处理

        @Override
        public void execute(final Runnable command) {
            doExecute(wrapRunnable(command));
        }
    
        protected void doExecute(final Runnable command) {
            try {
                super.execute(command);
            } catch (EsRejectedExecutionException ex) {
                if (command instanceof AbstractRunnable) {
                    // If we are an abstract runnable we can handle the rejection
                    // directly and don't need to rethrow it.
                    try {
                        ((AbstractRunnable) command).onRejection(ex);
                    } finally {
                        ((AbstractRunnable) command).onAfter();
    
                    }
                } else {
                    throw ex;
                }
            }
        }

    ?

讲完了ES中常用的三个线程池实现,还想结合JDK源码,记录一下线程在执行任务过程中抛出运行时异常,是如何处理的。我觉得有二种方式(或者说有2个地方)来处理运行时异常。一种方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另一种方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException

  1. afterExecute

    看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源码注释:

    Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.

    提交给线程池的任务,执行完(不管是正常结束,还是执行过程中出现了异常)后都会自动调用afterExecute()方法。如果执行过程中出现了异常,那么Throwable t 就不为null,并且导致执行终止(terminate abruptly.)。

    This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.

    默认的afterExecute(Runnable r, Throwable t) 方法是一个空实现,什么也没有。因此,在继承ThreadPoolExecutor实现自己的线程池时,如果重写该方法,则要记住:先调用 super.afterExecute

    比如说这样干:

     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         super.afterExecute(r, t);
         if (t != null) {
             //出现了异常
             if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) {
                 //AbstractRunnable 设置为强制执行时重新拉起任务
                 execute(r);
                 logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage());
             }
         }
     }

    看,重写afterExecute方法,当 Throwable 不为null时,表明线程执行任务过程中出现了异常,这时就重新提交任务。

    有个时候,在实现 Kafka 消费者线程的时候(while true循环),经常因为解析消息出错导致线程抛出异常,就会导致 Kafka消费者线程挂掉,这样就永久丢失了一个消费者了。而通过这种方式,当消费者线程挂了时,可重新拉起一个新任务。

  2. uncaughtException

    创建 ThreadPoolExecutor时,要传入ThreadFactory 作为参数,在而创建ThreadFactory 对象时,就可以设置线程的异常处理器java.lang.Thread.UncaughtExceptionHandler。

    在用Google Guava包的时候,一般这么干:

    //先 new  Thread.UncaughtExceptionHandler对象 exceptionHandler
    private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
    

    在线程执行任务过程中,如何抛出了异常,就会由JVM调用 Thread.UncaughtExceptionHandler 中实现的异常处理逻辑。看Thread.UncaughtExceptionHandler的JDK源码注释:

    Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.

    When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler‘s uncaughtException method, passing the thread and the exception as arguments.

    其大意就是:如果线程在执行Runnable任务过程因为 uncaught exception 而终止了,那么 JVM 就会调用getUncaughtExceptionHandler 方法查找是否设置了异常处理器,如果设置了,那就就会调用异常处理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,这样我们就可以在这个方法里面定义异常处理逻辑了。

    ?

总结

ES的ThreadPool 模块是学习线程池的非常好的一个示例,实践出真知。它告诉你如何自定义线程池(用什么任务队列?cpu核数、任务队列长度等参数如何配置?)。在实现自定义任务队列过程中,也进一步理解了CAS操作的原理,如何巧妙地使用CAS?是失败重试呢?还是直接返回?

另外,线程在执行Runnable任务过程中抛出了异常如何处理?这里提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自动调用的,后者则是在每个任务执行结束后都会被调用。

另外,注意:Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不同的。RejectedExecutionHandler 用来处理任务在提交的时候,被线程池拒绝了,该怎么办的问题,默认是AbortPolicy,即:直接丢弃。

另,等下次有时间,好好地写一篇分析ElasticSearch6.3.2的线程池模块。:)
原文:https://www.cnblogs.com/hapjin/p/10617702.html

原文地址:https://www.cnblogs.com/hapjin/p/10617702.html

时间: 2024-09-29 03:15:52

自定义 ThreadPoolExecutor 处理线程运行时异常的相关文章

编译时异常与运行时异常的区别

Java异常类层次结构图 Java异常类层次结构图 运行时异常:都是RuntimeException类及其子类异常,如NullPointerException(空指针异常).IndexOutOfBoundsException(下标越界异常)等,这些异常是不检查异常,程序中可以选择捕获处理,也可以不处理.这些异常一般是由程序逻辑错误引起的,程序应该从逻辑角度尽可能避免这类异常的发生. 运行时异常的特点是Java编译器不会检查它,也就是说,当程序中可能出现这类异常,即使没有用try-catch语句捕

java中的一般异常与运行时异常

Java提供了两类主要的异常:runtime exception和checked exception.checked 异常也就是我们经常遇到的IO异常,以及SQL异常都是这种异常.对于这种异常,JAVA编译器强制要求我们必需对出现的这些异常进行catch.所以,面对这种异常不管我们是否愿意,只能自己去写一大堆catch块去处理可能的异常. 但是另外一种异常:runtime exception,也称运行时异常,我们可以不处理.当出现这样的异常时,总是由虚拟机接管.比如:我们从来没有人去处理过Nul

Spring学习笔记——Spring事务只对运行时异常回滚

我们在使用Spring时候一般都知道事务在遇到异常的时候会回滚,岂不知Spring的事务默认只有在发生运行时异常即:RunTimeException时才会发生事务,如果一个方法抛出Exception或者Checked异常Spring的事务并不会回滚. 下面我们来看看异常的分类,异常一般分为Checked异常和RunTime异常. CheckedException: Java认为Checked异常都是可以被处理的异常,所以Java程序必须显式的处理Checked异常,如果程序没有处理checked

【搜狗问问】运行时异常与一般异常有何异同?

Java提供了两类主要的异常:runtime exception和checked exception.checked 异常也就是我们经常遇到的IO异常,以及SQL异常都是这种异常.对于这种异常,JAVA编译器强制要求我们必需对出现的这些异常进行catch.所以,面对这种异常不管我们是否愿意,只能自己去写一大堆catch块去处理可能的异常. 但是另外一种异常:runtime exception,也称运行时异常,我们可以不处理.当出现这样的异常时,总是由虚拟机接管.比如:我们从来没有人去处理过Nul

运行时异常和一般异常

Java提供了两类主要的异常:runtime exception和checked exception. checked 异常,一般异常,就是我们经常遇到的IO异常,SQL异常. 对于这种异常,JAVA编译器强制要求我们必需对出现的这些异常进行catch声明. 所以,面对这种异常不管我们是否愿意,只能自己去写一大堆catch块去处理可能的异常. runtime 异常,运行时异常,我们可以不处理. 当出现这样的异常时,总是由虚拟机接管.比如:我们从来没有人去处理过NullPointerExcepti

从0开始学Java:运行时异常与一般异常有何异同?

无限互联从0开始学Java系列之JAVA相关基础知识,Java基础培训,String 和StringBuffer的区别 1.Java基础学习,从0开始学Java:String 和StringBuffer的区别? JAVA平台提供了两个类:String和StringBuffer,它们可以储存和操作字符串,即包含多个字符的字符数据.这个String类提供了数值不可改变的字符串.而这个StringBuffer类提供的字符串进行修改.当你知道字符数据要改变的时候你就可以使用StringBuffer.典型

区分运行时异常和受检查异常【文摘+转】

正确运用异常处理机制,有助于提高程序的健壮性. 所谓程序的健壮性,就是指程序在多数情况下能够正常运行,返回预期的正确结果:如果偶尔遇到异常情况,程序也能采取周到的解决措施. 受检查异常表示程序可以处理的异常,如果抛出异常的方法本身不能处理它,那么方法调用者应该去处理它,从而使程序恢复运行,不至于终止程序.例如,喷墨打印机在打印文件时,如果纸用完或者墨水用完,就会暂停打印,等待用户添加打印纸或更换墨盒,如果用户添加了打印纸或更换了墨盒,就能继续打印. 可以用OutOfPaperException类

Java运行时异常和非运行时异常

1.Java异常机制 Java把异常当做对象来处理,并定义一个基类java.lang.Throwable作为所有异常的超类.Java中的异常分为两大类:错误Error和异常Exception,Java异常体系结构如下图所示: 图片来源:http://blog.csdn.net/wuwenxiang91322/article/details/10346337 2.Throwable Throwable类是所有异常或错误的超类,它有两个子类:Error和Exception,分别表示错误和异常.其中异

运行时异常和非运行是异常

Throwable是所有Java程序中错误处理的父类,有两种资类:Error和Exception. Error:表示由JVM所侦测到的无法预期的错误,由于这是属于JVM层次的严重错误,导致JVM无法继续执行,因此,这是不可捕捉到的,无法采取任何恢复的操作,顶多只能显示错误信息. Exception:表示可恢复的例外,这是可捕捉到的. Java提供了两类主要的异常:runtime exception和checked exception.checked 异常也就是我们经常遇到的IO异常,以及SQL异