线程池异常处理之重启线程处理任务

线程池异常处理之重启线程处理任务

本文记录一下在使用线程池过程中,如何处理 while(true)循环长期运行的任务,在业务处理逻辑中,如果抛出了运行时异常时怎样重新提交任务。

这种情形在Kafka消费者中遇到,当为每个Consumer开启一个线程时, 在线程的run方法中会有while(true)循环中消费Topic数据。

本文会借助Google Guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder类创建线程工厂,因为它能不仅很方便地为线程池设置一个易读的名称,而且很方便地设置线程执行过程中出现异常时 用来处理异常的 异常处理器,示例如下:

 MyExceptionHandler exceptionHandler = new MyExceptionHandler();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
                .setUncaughtExceptionHandler(exceptionHandler).build();

当线程执行过程中出现了异常,MyExceptionHandler#uncaughtException(...)方法就会由JVM调用。在java.lang.ThreadGroup#uncaughtException方法注释提到:由于每个线程都隶属于某个线程组,如果该线程所属的线程组有父线程组,则调用父线程组中指定的异常处理器;若没有父线程组,则判断 有没有 为线程自定义 异常处理器,而在本文中,定义了自己的异常处理器:MyExceptionHandler,因此线程执行异常时就会调用MyExceptionHandler#uncaughtException(...)

创建好了线程工厂,接下来就是创建线程池了。

CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);

CustomThreadPoolExecutor 继承ThreadPoolExecutor扩展线程池的功能:若线程执行某任务失败时 需要重新提交该任务,可以重写CustomThreadPoolExecutor#afterExecute方法,在该方法中实现提交任务。

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        //若线程执行某任务失败了,重新提交该任务
        if (t != null) {
            Runnable task =  r;
            System.out.println("restart task...");
            execute(task);
        }
    }
}

如果在new ThreadPoolExecutor时未传入 ThreadFactory参数,如下:

BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue);

其实是调用Executors.defaultThreadFactory()创建默认的ThreadFactory:

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

它为每个创建的线程设置了名字:"pool-xxx-thread-xxx"。而采用默认的ThreadFactory时相应的默认的异常处理器执行逻辑是由java.lang.ThreadGroup#uncaughtException方法来处理的,其中处理异常的相关源码如下:

else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }

如果线程执行过程中抛出的错误 不是 ThreadDeath对象,那么只是简单地:打印线程名称,并将堆栈信息记录到控制台中,任务结束。如果是一个ThreadDeath对象,看ThreadDeath类的源码注释可知:异常处理器不会被调用,程序不会输出任何日志信息。(有木有碰到这种情况,线程池中的线程不知不觉地消失了……)

The ThreadGroup#uncaughtException top-level error handler does not print out a message if ThreadDeath is never caught.

在本文的示例程序CustomThreadPoolExecutorTest.java中,为了模拟在while(true)循环中抛出异常,定义一个 Boolean 变量 stop 使得线程执行一段时间抛出一个异常:也即先让test线程运行一段时间,然后主线程设置 stop 变量的值,使得test线程抛出运行时异常。(完整代码可参考文末)

if (stop) {
    throw new RuntimeException("running encounter exception");
 }

线程池提交 while(true)循环任务:

        threadPoolExecutor.execute(()->{
            //提交的是一个while(true)任务,正常运行时这类任务不会结束
            while (true) {
                System.out.println("start processing");
                try {
                    //模拟任务每次执行耗时1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");

                if (stop) {
                    throw new RuntimeException("running encounter exception");
                }
            }
        });

threadPoolExecutor.execute提交了一个任务,这会耗费一个线程来执行该任务,由于任务是个while(true)循环,正常情况下该任务不会终止。换句话说,这个任务会"永久"占用线程池中的一个线程。因此,对于while(true)循环的任务需要注意:

创建线程池new ThreadPoolExecutor(...)时,指定的 corePoolSize 不能小于 需要提交的任务个数,否则有些任务不能立即启动,线程池需要增加线程(最大增加到maximumPoolSize 个线程)来处理。如果 maximumPoolSize 小于需要提交的任务个数,由于每个任务永久地占用一个线程执行,那么有些任务就只能一直堆积在taskQueue 任务队列中了

而在本示例中,main 线程通过设置 stop 变量让 test 线程抛出异常,自定义的异常处理器MyExceptionHandler就会处理该异常,并且在该任务执行“完成”后,JVM会调用线程池的afterExecute(...)方法,又重新提交该任务。

总结

这篇文章总结了本人在使用JAVA线程池中的一些理解,写代码以线程池方式提交任务,程序跑一段时间,没有数据输出了,好像暂停了,看堆栈信息线程莫名其妙地消失了,或者阻塞在任务队列上拿不到Task了……因此需要明白线程池底层执行的机制。

  1. 在实现Kafka消费者过程中,每个消费者一个线程,使用线程池来管理线程、提交任务。但总过一段时间后Kafka Broker Rebalance,看后台日志是Kafka Consumer在解析一些消息时抛出了运行时异常。这样线程池就结束了这个任务,由于没有重写afterExecute()方法 当任务出现异常时重新提交任务。因此,这意味着永久丢失了一个消费者线程。而少了一个消费者,Kafka就发生了Rebalance。
  2. 尽量使用线程池来管理线程,而不是自己 new Thread(),一方面是采用线程池可方便地为每个线程设置合理的名称,这样便于debug。另一方面,通过 implements Thread.UncaughtExceptionHandler自定义线程运行时异常处理器,可方便地打印出线程异常日志。
  3. 可继承ThreadPoolExecutor扩展线程池功能,比如在任务执行完成后,执行一些额外的操作。关于如何扩展线程池,ElasticSearch源码中线程池模块很值得借鉴。
  4. 上文中提到的异常处理器 和 向线程池提交任务的拒绝策略RejectedExecutionHandler是两回事。另外,为了图方便,直接在main方法中创建线程池了,实际应用中肯定不能这样。这里给出的代码只是Examples。

最后给出一个思考问题:针对需要长期运行的任务,比如每隔一段时间从Redis中读取若干条数据。是提交一个Runnable任务,这个Runnable任务里是个while(true)循环读取数据:

        executor.execute(()->{
            while (true) {
                //读若干条数据
                read();
                sleep(1000);
            }
        });

还是:在一个外部while循环中,不断地向 taskQueue 任务队列中提交任务呢?

        while (true) {
            executor.execute(()->{
                read();
            });
            sleep(1000);
        }

CustomThreadPoolExecutorTest.java 完整代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutorTest {
    private static volatile boolean stop = false;
    public static void main(String[] args)throws InterruptedException {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
        //定义 线程执行过程中出现异常时的 异常处理器
        MyExceptionHandler exceptionHandler = new MyExceptionHandler();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
                .setUncaughtExceptionHandler(exceptionHandler).build();
        CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);

        threadPoolExecutor.execute(()->{
            //提交的是一个while(true)任务,正常运行时这类任务不会结束
            while (true) {
                System.out.println("start processing");
                try {
                    //模拟任务每次执行耗时1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");

                if (stop) {
                    throw new RuntimeException("running encounter exception");
                }
            }
        });

        Thread.sleep(2000);
        //模拟 test- 线程 在执行任务过程中抛出异常
        stop = true;
        Thread.sleep(1000);
        stop = false;
    }

    private static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.out.println(String.format("thread name %s, msg %s", t.getName(), e.getMessage()));
        }
    }
}

ThreadPoolExecutorTest.java 测试线程在执行过程中抛出ThreadDeath对象:

import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
    private static volatile boolean stop = false;
    public static void main(String[] args) throws InterruptedException{
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, taskQueue);
        executor.execute(()->{
            while (true) {
                System.out.println("start processing");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");
                if (stop) {
                    throw new ThreadDeath();
//                    throw new RuntimeException("runtime exception");
                }
            }
        });
        Thread.sleep(3000);
        stop = true;
        Thread.sleep(2000);

        executor.execute(()->{
            //能够继续提交任务执行
            System.out.println("continue submit runnable task,is All thread in thread pool dead?");
        });
    }
}

参考资料:

原文:https://www.cnblogs.com/hapjin/p/10240863.html

原文地址:https://www.cnblogs.com/hapjin/p/10240863.html

时间: 2024-07-31 22:55:38

线程池异常处理之重启线程处理任务的相关文章

C#如何判断线程池中所有的线程是否已经完成之Demo

1 start: 2 3 System.Threading.RegisteredWaitHandle rhw = null; 4 new Action(() => 5 { 6 for (var i = 0; i < 30; i++) { 7 new Action<int>((index) => 8 { 9 System.Threading.Thread.Sleep(1000); 10 Console.WriteLine(System.Threading.Thread.Curr

java多线程系类:JUC线程池:05之线程池原理(四)(转)

概要 本章介绍线程池的拒绝策略.内容包括:拒绝策略介绍拒绝策略对比和示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3512947.html 拒绝策略介绍 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施.当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭.第二,任务数量超过线程池的最大限制. 线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOld

C#如何判断线程池中所有的线程是否已经完成(转)

其 实很简单用ThreadPool.RegisterWaitForSingleObject方法注册一个定时检查线程池的方法,在检查线程的方法内调用 ThreadPool.GetAvailableThreads与ThreadPool.GetMaxThreads并比较两个方法返回的值是不是相等, 相等表示线池内所有的线程已经完成. //每秒检次一次线程池的状态 RegisteredWaitHandle rhw = ThreadPool.RegisterWaitForSingleObject(Auto

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

使用线程池而不是创建线程

在我们开发程序时,若存在耗性能.高并发处理的任务时,我们会想到用多线程来处理.在多线程处理中,有手工创建线程与线程池2种处理方式,手工创建线程存在管理与维护的繁琐..Net线程池能够帮我们完成线程资源的管理工作,使用我们专注业务处理,而不是代码的细微实现.在你创建了过多的任务,线程池也能用列队把无法即使处理的请求保存起来,直至有线程释放出来. 当应用程序开始执行重复的后台任务,且并不需要经常与这些任务交互时,使用.Net线程池管理这些资源将会让性能更佳.我们可以使用ThreadPool.Queu

Android性能优化之线程池策略和对线程池的了解

线程的运行机制 1. 开启线程过多,会消耗cpu 2. 单核cpu,同一时刻只能处理一个线程,多核cpu同一时刻可以处理多个线程 3. 操作系统为每个运行线程安排一定的CPU时间----`时间片`,系统通过一种循环的方式为线程提供时间片,线程在自己的时间内运行,因为时间相当短,多个线程频繁地发生切换,因此给用户的感觉就是好像多个线程同时运行一样,但是如果计算机有多个CPU,线程就能真正意义上的同时运行了. 线程池的作用 1. 线程池是预先创建线程的一种技术.线程池在还没有任务到来之前,创建一定数

线程池2_定长线程池

package com.chauvet.utils.threadPool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /*** * * 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待. * * 定长线程池的大小最好根据系统资源进行设置. * 如:Runtime.getRuntime().availableProcessors() (java虚拟机可用的

Thread 线程池中可用的线程数量

GetAvaliableThread(out workerThreadCount,out iocompletedThreadCount)   函数居然一次返回了两个变量.线程池里的线程按照公用被分成了两大类:工作线程和IO线程,或者IO完成线程,前者用于执行普通的操作,后者专用于异步IO,比如文件和网络请求,注意,分类并不说明两种线程本身有差别,线程就是线程,是一种执行单元,从本质上来讲都是一样的,线程池这样分类 Thread 线程池中可用的线程数量

线程池;java实现线程池原理

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动. 组成部分 1.线程池