深度解析Java线程池的异常处理机制

引言
在Java中,使用线程池来异步执行一些耗时任务是非常常见的操作。最初我们一般都是直接使用new Thread().start的方式,但我们知道,线程的创建和销毁都会耗费大量的资源
前言
今天小伙伴遇到个小问题,线程池提交的任务如果没有catch异常,那么会抛到哪里去,之前倒是没研究过,本着实事求是的原则,看了一下代码。

正文
小问题
考虑下面这段代码,有什么区别呢?你可以猜猜会不会有异常打出呢?如果打出来的话是在哪里?:

    ExecutorService threadPool = Executors.newFixedThreadPool(1);
    threadPool.submit(() -> {
        Object obj = null;
        System.out.println(obj.toString());
    });
    threadPool.execute(() -> {
        Object obj = null;
        System.out.println(obj.toString());
    });

源码解析
我们下面就来看下代码, 其实就是将我们提交过去的Runnable包装成一个Future

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // volatile修饰,保证多线程下的可见性,可以看看Java内存模型
}
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

接下来就会实际提交到队列中交给线程池调度处理:

/**
* 代码还是很清爽的,一个很典型的生产者/消费者模型,
* 这里暂不纠结这些细节,那么如果提交到workQueue成功的话,消费者是谁呢?
* 明显在这个newWorker里搞的鬼,同样细节有兴趣可以自己再去研究,这里我们会发现
* 核心就是Worker这个内部类
*/
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

那么接下来看看线程池核心的流程:

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/* Delegates main run loop to outer runWorker /
public void run() {
runWorker(this);
}
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask()方法会尝试从队列中抓取数据
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//可覆写此方法打日志埋点之类的
beforeExecute(wt, task);
Throwable thrown = null;
try {
//简单明了,直接调用run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
submit的方式
那么我们可以这里是直接调用的run方法,先看submit的方式,我们知道最终传递过去的是一个FutureTask,也就是说会调用这里的run方法,我们看看实现:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //。。。
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
      //省略
}

  protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t; //赋给了这个变量
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

可以看到其实类似于直接吞掉了,这样的话我们调用get()方法的时候会拿到, 比如我们可以重写afterExecute方法,从而可以得到实际的异常:

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
//get这里会首先检查任务的状态,然后将上面的异常包装成ExecutionException
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null){
//异常处理
t.printStackTrace();
}
}
execute的方式
那么如果是直接exeture的方式有啥不同呢?这样的话传递过去的就直接是Runnable,因此就会直接抛出:

try {
    task.run();
} catch (RuntimeException x) {
    thrown = x; throw x;
} catch (Error x) {
    thrown = x; throw x;
} catch (Throwable x) {
    thrown = x; throw new Error(x);
} finally {
    afterExecute(task, thrown);
}

那么这里的异常到底会抛出到哪里呢, 我们看看JVM具体是怎么处理的:

if (!destroy_vm || JDK_Version::is_jdk12x_version()) {
// JSR-166: change call from from ThreadGroup.uncaughtException to
// java.lang.Thread.dispatchUncaughtException
if (uncaught_exception.not_null()) {
//如果有未捕获的异常
Handle group(this, java_lang_Thread::threadGroup(threadObj()));
{
KlassHandle recvrKlass(THREAD, threadObj->klass());
CallInfo callinfo;
KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass());
/
这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法
template(dispatchUncaughtException_name, "dispatchUncaughtException")
/
LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass,
vmSymbols::dispatchUncaughtException_name(),
vmSymbols::throwable_void_signature(),
KlassHandle(), false, false, THREAD);
CLEAR_PENDING_EXCEPTION;
methodHandle method = callinfo.selected_method();
if (method.not_null()) {
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
threadObj, thread_klass,
vmSymbols::dispatchUncaughtException_name(),
vmSymbols::throwable_void_signature(),
uncaught_exception,
THREAD);
} else {
KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
group, thread_group,
vmSymbols::uncaughtException_name(),
vmSymbols::thread_throwable_void_signature(),
threadObj, // Arg 1
uncaught_exception, // Arg 2
THREAD);
}
if (HAS_PENDING_EXCEPTION) {
ResourceMark rm(this);
jio_fprintf(defaultStream::error_stream(),
"\nException: %s thrown from the UncaughtExceptionHandler"
" in thread \"%s\"\n",
pending_exception()->klass()->external_name(),
get_thread_name());
CLEAR_PENDING_EXCEPTION;
}
}
}
可以看到这里最终会去调用Thread#dispatchUncaughtException方法:

private void dispatchUncaughtException(Throwable e) {
    //默认会调用ThreadGroup的实现
    getUncaughtExceptionHandler().uncaughtException(this, e);
}

?

public void uncaughtException(Thread t, Throwable e) {
    if (parent != null) {
        parent.uncaughtException(t, e);
    } else {
        Thread.UncaughtExceptionHandler ueh =
            Thread.getDefaultUncaughtExceptionHandler();
        if (ueh != null) {
            ueh.uncaughtException(t, e);
        } else if (!(e instanceof ThreadDeath)) {
            //可以看到会打到System.err里面
            System.err.print("Exception in thread \""
                             + t.getName() + "\" ");
            e.printStackTrace(System.err);
        }
    }
}

这里如果环境是tomcat的话最终会打到catalina.out:

总结
对于线程池、包括线程的异常处理推荐一下方式:

1 直接try/catch,个人 基本都是用这种方式

2 线程直接重写整个方法:

   Thread t = new Thread();
   t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

       public void uncaughtException(Thread t, Throwable e) {
          LOGGER.error(t + " throws exception: " + e);
       }
    });
    //如果是线程池的模式:
    ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
        Thread t = new Thread(r);
        t.setUncaughtExceptionHandler(
            (t1, e) -> LOGGER.error(t1 + " throws exception: " + e));
        return t;
    });

3 也可以直接重写protected void afterExecute(Runnable r, Throwable t) { }方法

原文地址:http://blog.51cto.com/13952955/2286196

时间: 2024-08-30 14:15:06

深度解析Java线程池的异常处理机制的相关文章

深度解析Java线程池的异常处理机制 #3

前言 今天小伙伴遇到个小问题,线程池提交的任务如果没有catch异常,那么会抛到哪里去,之前倒是没研究过,本着实事求是的原则,看了一下代码. 正文 小问题 考虑下面这段代码,有什么区别呢?你可以猜猜会不会有异常打出呢?如果打出来的话是在哪里?: ExecutorService threadPool = Executors.newFixedThreadPool(1); threadPool.submit(() -> { Object obj = null; System.out.println(o

Java源码解析 - ThreadPoolExecutor 线程池

1 线程池的好处 线程使应用能够更加充分合理地协调利用CPU.内存.网络.I/O等系统资源.线程的创建需要开辟虚拟机栈.本地方法栈.程序计数器等线程私有的内存空间;在线程销毁时需要回收这些系统资源.频繁地创建和销毁线程会浪费大量的系统资源,增加并发编程风险. 在服务器负载过大的时候,如何让新的线程等待或者友好地拒绝服务? 这些都是线程自身无法解决的;所以需要通过线程池协调多个线程,并实现类似主次线程隔离.定时执行.周期执行等任务. 线程池的作用包括:●利用线程池管理并复用线程.控制最大并发数等●

java线程池与五种常用线程池策略使用与解析

背景:面试中会要求对5中线程池作分析.所以要熟知线程池的运行细节,如CachedThreadPool会引发oom吗? java线程池与五种常用线程池策略使用与解析 可选择的阻塞队列BlockingQueue详解 首先看一下新任务进入时线程池的执行策略: 如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队.(如果当前运行的线程小于corePoolSize,则任务根本不会存入queue中,而是直接运行) 如果运行的线程大于等于 corePoolSize

java线程池和五种常用线程池的策略使用与解析

java线程池和五种常用线程池策略使用与解析 一.线程池 关于为什么要使用线程池久不赘述了,首先看一下java中作为线程池Executor底层实现类的ThredPoolExecutor的构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory th

含源码解析,深入Java 线程池原理

从池化技术到底层实现,一篇文章带你贯通线程池技术. 1.池化技术简介 在系统开发过程中,我们经常会用到池化技术来减少系统消耗,提升系统性能. 在编程领域,比较典型的池化技术有: 线程池.连接池.内存池.对象池等. 对象池通过复用对象来减少创建对象.垃圾回收的开销:连接池(数据库连接池.Redis连接池和HTTP连接池等)通过复用TCP连接来减少创建和释放连接的时间.线程池通过复用线程提升性能.简单来说,池化技术就是通过复用来提升性能. 线程.内存.数据库的连接对象都是资源,在程序中,当你创建一个

Java线程池Executors.newFixedThreadPool原理解析

从事Java多线程开发的程序员来说,了解Java的线程池实现原理是必不可少的,以下将会结合Java线程池代码来说明它的实现原理,首先,我们要思考: 线程池的表现形式 线程池里面的线程什么时候创建 线程池里面的线程什么时候结束或者该不该结束 线程池的实现原理 说道Java线程池就不得不说ExecutorService接口和Executors类了,从源码上来看Executors类里面封装了线程池的创建,并且定义了各自不同的线程池类型,本文着重讲Executors这个类的newFixedThreadP

面试题-关于Java线程池一篇文章就够了

在Java面试中,线程池相关知识,虽不能说是必问提,但出现的频次也是非常高的.同时又鉴于公众号"程序新视界"的读者后台留言让写一篇关于Java线程池的文章,于是就有本篇内容,本篇将基于Java线程池的原理.实现以及相关源码进行讲解等. 什么是线程池 线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理. 为了充分利用CPU多核资源,应用都会采用多线程并行/并发计算,最大限度的利用多核提升应用程序性能. 试想一下,如果每个请求都执行一遍创建线程.执行任务.

Java线程池的底层实现与使用

前言 在我们进行开发的时候,为了充分利用系统资源,我们通常会进行多线程开发,实现起来非常简单,需要使用线程的时候就去创建一个线程(继承Thread类.实现Runnable接口.使用Callable和Future),但是这样也有一点问题,就是如果并发的线程数量很多,创建线程.销毁线程都是需要消耗时间.资源,这个时候线程池就派上用场了 一.四种线程池的介绍 Java通过Executors提供了四种线程池,分别是 1.newSingleThreadExecutor() 创建一个单线程化的线程池,它只会

Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析

目录 引出线程池 Executor框架 ThreadPoolExecutor详解 构造函数 重要的变量 线程池执行流程 任务队列workQueue 任务拒绝策略 线程池的关闭 ThreadPoolExecutor创建线程池实例 参考: 引出线程池 线程是并发编程的基础,前面的文章里,我们的实例基本都是基于线程开发作为实例,并且都是使用的时候就创建一个线程.这种方式比较简单,但是存在一个问题,那就是线程的数量问题. 假设有一个系统比较复杂,需要的线程数很多,如果都是采用这种方式来创建线程的话,那么