线程池没你想的那么简单(续)

前言

前段时间写过一篇《线程池没你想的那么简单》,和大家一起撸了一个基本的线程池,具备:

  • 线程池基本调度功能。
  • 线程池自动扩容缩容。
  • 队列缓存线程。
  • 关闭线程池。

这些功能,最后也留下了三个待实现的 features

  • 执行带有返回值的线程。
  • 异常处理怎么办?
  • 所有任务执行完怎么通知我?

这次就实现这三个特性来看看 j.u.c 中的线程池是如何实现这些需求的。

再看本文之前,强烈建议先查看上文《线程池没你想的那么简单》

任务完成后的通知

大家在用线程池的时候或多或少都会有这样的需求:

线程池中的任务执行完毕后再通知主线程做其他事情,比如一批任务都执行完毕后再执行下一波任务等等。

以我们之前的代码为例:

总共往线程池中提交了 13 个任务,直到他们都执行完毕后再打印 “任务执行完毕” 这个日志。

执行结果如下:

为了简单的达到这个效果,我们可以在初始化线程池的时候传入一个接口的实现,这个接口就是用于任务完成之后的回调。

public interface Notify {

    /**
     * 回调
     */
    void notifyListen() ;
}

以上就是线程池的构造函数以及接口的定义。

所以想要实现这个功能的关键是在何时回调这个接口?

仔细想想其实也简单:只要我们记录提交到线程池中的任务及完成的数量,他们两者的差为 0 时就认为线程池中的任务已执行完毕;这时便可回调这个接口。

所以在往线程池中写入任务时我们需要记录任务数量:

为了并发安全的考虑,这里的计数器采用了原子的 AtomicInteger



而在任务执行完毕后就将计数器 -1 ,一旦为 0 时则任务任务全部执行完毕;这时便可回调我们自定义的接口完成通知。


JDK 的实现

这样的需求在 jdk 中的 ThreadPoolExecutor 中也有相关的 API ,只是用法不太一样,但本质原理都大同小异。

我们使用 ThreadPoolExecutor 的常规关闭流程如下:

    executorService.shutdown();
    while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
        logger.info("thread running");
    }

线程提交完毕后执行 shutdown() 关闭线程池,接着循环调用 awaitTermination() 方法,一旦任务全部执行完毕后则会返回 true 从而退出循环。

这两个方法的目的和原理如下:

  • 执行 shutdown() 后会将线程池的状态置为关闭状态,这时将会停止接收新的任务同时会等待队列中的任务全部执行完毕后才真正关闭线程池。
  • awaitTermination 会阻塞直到线程池所有任务执行完毕或者超时时间已到。

为什么要两个 api 结合一起使用呢?

主要还在最终的目的是:所有线程执行完毕后再做某件事情,也就是在线程执行完毕之前其实主线程是需要被阻塞的。

shutdown() 执行后并不会阻塞,会立即返回,所有才需要后续用循环不停的调用 awaitTermination(),因为这个 api 才会阻塞线程。

其实我们查看源码会发现,ThreadPoolExecutor 中的阻塞依然也是等待通知机制的运用,只不过用的是 LockSupportAPI 而已。

带有返回值的线程

接下来是带有返回值的线程,这个需求也非常常见;比如需要线程异步计算某些数据然后得到结果最终汇总使用。

先来看看如何使用(和 jdk 的类似):

首先任务是不能实现 Runnable 接口了,毕竟他的 run() 函数是没有返回值的;所以我们改实现一个 Callable 的接口:

这个接口有一个返回值。

同时在提交任务时也稍作改动:

首先是执行任务的函数由 execute() 换为了 submit(),同时他会返回一个返回值 Future,通过它便可拿到线程执行的结果。

最后通过第二步将所有执行结果打印出来:

实现原理

再看具体实现之前先来思考下这样的功能如何实现?

  • 首先受限于 jdk 的线程 api 的规范,要执行一个线程不管是实现接口还是继承类,最终都是执行的 run() 函数。
  • 所以我们想要一个线程有返回值无非只能是在执行 run() 函数时去调用一个有返回值的方法,再将这个返回值存放起来用于后续使用。

比如我们这里新建了一个 Callable<T> 的接口:

public interface Callable<T> {

    /**
     * 执行任务
     * @return 执行结果
     */
    T call() ;
}

它的 call 函数就是刚才提到的有返回值的方法,所以我们应当在线程的 run() 函数中去调用它。

接着还会有一个 Future 的接口,他的主要作用是获取线程的返回值,也就是 再将这个返回值存放起来用于后续使用 这里提到的后续使用

既然有了接口那自然就得有它的实现 FutureTask,它实现了 Future 接口用于后续获取返回值。

同时实现了 Runnable 接口会把自己变为一个线程。

所以在它的 run() 函数中会调用刚才提到的具有返回值的 call() 函数。



再次结合 submit() 提交任务和 get() 获取返回值的源码来看会更加理解这其中的门道。

    /**
     * 有返回值
     *
     * @param callable
     * @param <T>
     * @return
     */
    public <T> Future<T> submit(Callable<T> callable) {
        FutureTask<T> future = new FutureTask(callable);
        execute(future);
        return future;
    }

submit() 非常简单,将我们丢进来的 Callable 对象转换为一个 FutureTask 对象,然后再调用之前的 execute() 来丢进线程池(后续的流程就和一个普通的线程进入线程池的流程一样)。

FutureTask 本身也是线程,所以可以直接使用 execute() 函数。



future.get() 函数中 future 对象由于在 submit() 中返回的真正对象是 FutureTask,所以我们直接看其中的源码就好。

由于 get() 在线程没有返回之前是一个阻塞函数,最终也是通过 notify.wait() 使线程进入阻塞状态来实现的。

而使其从 wait() 中返回的条件必然是在线程执行完毕拿到返回值的时候才进行唤醒。

也就是图中的第二部分;一旦线程执行完毕(callable.call())就会唤醒 notify 对象,这样 get 方法也就能返回了。



同样的道理,ThreadPoolExecutor 中的原理也是类似,只不过它考虑的细节更多所以看起来很复杂,但精简代码后核心也就是这些。

甚至最终使用的 api 看起来都是类似的:

异常处理

最后一个是一些新手使用线程池很容易踩坑的一个地方:那就是异常处理。

比如类似于这样的场景:

创建了只有一个线程的线程池,这个线程只做一件事,就是一直不停的 while 循环。

但是循环的过程中不小心抛出了一个异常,巧的是这个异常又没有被捕获。你觉得后续会发生什么事情呢?

是线程继续运行?还是线程池会退出?

通过现象来看其实哪种都不是,线程既没有继续运行同时线程池也没有退出,会一直卡在这里。

当我们 dump 线程快照会发现:

这时线程池中还有一个线程在运行,通过线程名称会发现这是新创建的一个线程(之前是Thread-0,现在是 Thread-1)。

它的线程状态为 WAITING ,通过堆栈发现是卡在了 CustomThreadPool.java:272 处。

就是卡在了从队列里获取任务的地方,由于此时的任务队列是空的,所以他会一直阻塞在这里。

看到这里,之前关注的朋友有没有似曾相识的感觉。

没错,我之前写过两篇:

线程池相关的问题,当时的讨论也非常“激烈”,其实最终的原因和这里是一模一样的。

所以就这次简版的代码来看看其中的问题:

现在又简化了一版代码我觉得之前还有疑问的朋友这次应该会更加明白。

其实在线程池内部会对线程的运行捕获异常,但它并不会处理,只是用于标记是否执行成功;

一旦执行失败则会回收掉当前异常的线程,然后重新创建一个新的 Worker 线程继续从队列里取任务然后执行

所以最终才会卡在从队列中取任务处。

其实 ThreadPoolExecutor 的异常处理也是类似的,具体的源码就不多分析了,在上面两篇文章中已经说过几次。

所以我们在使用线程池时,其中的任务一定要做好异常处理。

总结

这一波下来我觉得线程池搞清楚没啥问题了,总的来看它内部运用了非常多的多线程解决方案,比如:

  • ReentrantLock 重入锁来保证线程写入的并发安全。
  • 利用等待通知机制来实现线程间通信(线程执行结果、等待线程池执行完毕等)。

最后也学会了:

  • 标准的线程池关闭流程。
  • 如何使用有返回值的线程。
  • 线程异常捕获的重要性。

最后本文所有源码(结合其中的测试代码使用):

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java

你的点赞与分享是对我最大的支持

原文地址:https://www.cnblogs.com/crossoverJie/p/10982880.html

时间: 2024-11-05 19:36:42

线程池没你想的那么简单(续)的相关文章

这么说吧,java线程池的实现原理其实很简单

好处 : 线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配.调优和监控,有以下好处: 1.降低资源消耗: 2.提高响应速度: 3.提高线程的可管理性. Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行.被哪个线程执行,以及什么时候执行. demo 1.Executors.newFixedThreadPool(10)初始化一个包含10个线程的线程池ex

从列表到详情,没你想的那么简单

前言 本文先假设我们使用的是 vue + vuex + vue-router 的情况来展开讨论,React 全家桶的情况应该类似. 在日常的前端研发中,我们经常会遇到如题的场景:比如从商品列表进入商品详情,从订单列表进入订单详情.先看一个 demo~ 看起来是不是还算丝滑流畅,跟客户端效果较为接近~ 正文开始 很多同学应该会说,这不是很容易么,用 vue-router + transition 就好啦. <template> <transition name="custom-c

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

JAVA中线程池的简单使用

比如现在有10个线程,但每次只想运行3个线程,当这3个线程中的任何一个运行完后,第4个线程接着补上.这种情况可以使用线程池来解决,线程池用起来也相当的简单,不信,你看: package com.demo; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public

深入理解Java之线程池

原作者:海子 出处:http://www.cnblogs.com/dolphin0520/ 本文归作者海子和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可

Java并发编程:线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

线程池和异步线程

目录: 1 什么是CLR线程池? 2 简单介绍下线程池各个优点的实现细节 3 线程池ThreadPool的常用方法介绍 4 简单理解下异步线程 5 异步线程的工作过程和几个重要的元素 6 有必要简单介绍下Classic Async Pattern 和Event-based Async Pattern 7 异步线程的发展趋势以及.net4.5异步的简化 8 本章示例 自定义一个简单的线程池 Asp.net异步IHttpAsyncHandler示例 9 本章总结 1 什么是CLR线程池? 在上一章中

[转]深入理解Java之线程池

出处:http://www.cnblogs.com/dolphin0520/ 本文归作者海子和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool