Java高并发之线程池详解

线程池优势

在业务场景中, 如果一个对象创建销毁开销比较大, 那么此时建议池化对象进行管理.

例如线程, jdbc连接等等, 在高并发场景中, 如果可以复用之前销毁的对象, 那么系统效率将大大提升.

另外一个好处是可以设定池化对象的上限, 例如预防创建线程数量过多导致系统崩溃的场景.

jdk中的线程池

下文主要从以下几个角度讲解:

  • 创建线程池
  • 提交任务
  • 潜在宕机风险
  • 线程池大小配置
  • 自定义阻塞队列BlockingQueue
  • 回调接口
  • 自定义拒绝策略
  • 自定义ThreadFactory
  • 关闭线程池

创建线程池

我们可以通过自定义ThreadPoolExecutor或者jdk内置的Executors来创建一系列的线程池

  • newFixedThreadPool: 创建固定线程数量的线程池
  • newSingleThreadExecutor: 创建单一线程的池
  • newCachedThreadPool: 创建线程数量自动扩容, 自动销毁的线程池
  • newScheduledThreadPool: 创建支持计划任务的线程池

上述几种都是通过new ThreadPoolExecutor()来实现的, 构造函数源码如下:

 1     /**
 2      * @param corePoolSize 池内核心线程数量, 超出数量的线程会进入阻塞队列
 3      * @param maximumPoolSize 最大可创建线程数量
 4      * @param keepAliveTime 线程存活时间
 5      * @param unit 存活时间的单位
 6      * @param workQueue 线程溢出后的阻塞队列
 7      */
 8     public ThreadPoolExecutor(int corePoolSize,
 9                               int maximumPoolSize,
10                               long keepAliveTime,
11                               TimeUnit unit,
12                               BlockingQueue<Runnable> workQueue) {
13         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
14     }
15
16     public static ExecutorService newFixedThreadPool(int nThreads) {
17         return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
18     }
19
20     public static ExecutorService newSingleThreadExecutor() {
21         return new Executors.FinalizableDelegatedExecutorService
22                 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
23     }
24
25     public static ExecutorService newCachedThreadPool() {
26         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
27     }
28
29     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
30         return new ScheduledThreadPoolExecutor(corePoolSize);
31     }
32
33     public ScheduledThreadPoolExecutor(int corePoolSize) {
34         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
35     }

提交任务

直接调用executorService.execute(runnable)或者submit(runnable)即可,

execute和submit的区别在于submit会返回Future来获取任何执行的结果.

我们看下newScheduledThreadPool的使用示例.

 1 public class SchedulePoolDemo {
 2
 3     public static void main(String[] args){
 4         ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
 5         // 如果前面的任务没有完成, 调度也不会启动
 6         service.scheduleAtFixedRate(new Runnable() {
 7             @Override
 8             public void run() {
 9                 try {
10                     Thread.sleep(2000);
11                     // 每两秒打印一次.
12                     System.out.println(System.currentTimeMillis()/1000);
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 }
16             }
17         }, 0, 2, TimeUnit.SECONDS);
18     }
19 }

潜在宕机风险

使用Executors来创建要注意潜在宕机风险.其返回的线程池对象的弊端如下:

  • FixedThreadPool和SingleThreadPoolPool : 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM.
  • CachedThreadPool和ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM.

综上所述, 在可能有大量请求的线程池场景中, 更推荐自定义ThreadPoolExecutor来创建线程池, 具体构造函数配置见下文.

线程池大小配置

一般根据任务类型进行区分, 假设CPU为N核

  • CPU密集型任务需要减少线程数量, 降低线程之间切换造成的开销, 可配置线程池大小为N + 1.
  • IO密集型任务则可以加大线程数量, 可配置线程池大小为 N * 2.
  • 混合型任务则可以拆分为CPU密集型与IO密集型, 独立配置.

自定义阻塞队列BlockingQueue

主要存放等待执行的线程, ThreadPoolExecutor中支持自定义该队列来实现不同的排队队列.

  • ArrayBlockingQueue:先进先出队列,创建时指定大小, 有界;
  • LinkedBlockingQueue:使用链表实现的先进先出队列,默认大小为Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任务, 数据也不会缓存到队列中, 用于生产者和消费者互等对方, 一起离开.
  • PriorityBlockingQueue: 支持优先级的队列

回调接口

线程池提供了一些回调方法, 具体使用如下所示.

 1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
 2
 3             @Override
 4             protected void beforeExecute(Thread t, Runnable r) {
 5                 System.out.println("准备执行任务: " + r.toString());
 6             }
 7
 8             @Override
 9             protected void afterExecute(Runnable r, Throwable t) {
10                 System.out.println("结束任务: " + r.toString());
11             }
12
13             @Override
14             protected void terminated() {
15                 System.out.println("线程池退出");
16             }
17         };

可以在回调接口中, 对线程池的状态进行监控, 例如任务执行的最长时间, 平均时间, 最短时间等等, 还有一些其他的属性如下:

  • taskCount:线程池需要执行的任务数量.
  • completedTaskCount:线程池在运行过程中已完成的任务数量.小于或等于taskCount.
  • largestPoolSize:线程池曾经创建过的最大线程数量.通过这个数据可以知道线程池是否满过.如等于线程池的最大大小,则表示线程池曾经满了.
  • getPoolSize:线程池的线程数量.如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减.
  • getActiveCount:获取活动的线程数.

自定义拒绝策略

线程池满负荷运转后, 因为时间空间的问题, 可能需要拒绝掉部分任务的执行.

jdk提供了RejectedExecutionHandler接口, 并内置了几种线程拒绝策略

  • AbortPolicy: 直接拒绝策略, 抛出异常.
  • CallerRunsPolicy: 调用者自己执行任务策略.
  • DiscardOldestPolicy: 舍弃最老的未执行任务策略.

使用方式也很简单, 直接传参给ThreadPool

1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
2                 new SynchronousQueue<Runnable>(),
3                 Executors.defaultThreadFactory(),
4                 new RejectedExecutionHandler() {
5                     @Override
6                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
7                         System.out.println("reject task: " + r.toString());
8                     }
9                 });

自定义ThreadFactory

线程工厂用于创建池里的线程. 例如在工厂中都给线程setDaemon(true), 这样程序退出的时候, 线程自动退出.

或者统一指定线程优先级, 设置名称等等.

 1 class NamedThreadFactory implements ThreadFactory {
 2     private static final AtomicInteger threadIndex = new AtomicInteger(0);
 3     private final String baseName;
 4     private final boolean daemon;
 5
 6     public NamedThreadFactory(String baseName) {
 7         this(baseName, true);
 8     }
 9
10     public NamedThreadFactory(String baseName, boolean daemon) {
11         this.baseName = baseName;
12         this.daemon = daemon;
13     }
14
15     public Thread newThread(Runnable runnable) {
16         Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
17         thread.setDaemon(this.daemon);
18         return thread;
19     }
20 }

关闭线程池

跟直接new Thread不一样, 局部变量的线程池, 需要手动关闭, 不然会导致线程泄漏问题.

默认提供两种方式关闭线程池.

  • shutdown: 等所有任务, 包括阻塞队列中的执行完, 才会终止, 但是不会接受新任务.
  • shutdownNow: 立即终止线程池, 打断正在执行的任务, 清空队列.

原文地址:https://www.cnblogs.com/xdecode/p/9119794.html

时间: 2024-11-08 17:15:05

Java高并发之线程池详解的相关文章

Java线程池详解(二)

一.前言 在总结了线程池的一些原理及实现细节之后,产出了一篇文章:Java线程池详解(一),后面的(一)是在本文出现之后加上的,而本文就成了(二).因为在写完第一篇关于java线程池的文章之后,越发觉得还有太多内容需要补充,每次都是修修补补,总觉得还缺点什么.在第一篇中,我着重描述了java线程池的原理以及它的实现,主要的点在于它是如何工作的.而本文的内容将更为上层,重点在于如何应用java线程池,算是对第一篇文章的一点补充,这样对于java线程池的学习和总结稍微完整一些. 使用过java线程池

Java性能分析之线程栈详解(下)

Java性能分析之线程栈详解(下) 转载自:微信公众号"测试那点事儿" 结合jstack结果对线程状态详解 上篇文章详细介绍了线程栈的作用.状态.任何查看理解,本篇文章结合jstack工具来查看线程状态,并列出重点关注目标.Jstack是常用的排查工具,它能输出在某一个时间,Java进程中所有线程的状态,很多时候这些状态信息能给我们的排查工作带来有用的线索. Jstack的输出中,Java线程状态主要是以下几种: 1.BLOCKED 线程在等待monitor锁(synchronized

Android线程池详解

直接使用线程 在Android开发的时候,当我们需要完成一个耗时操作的时候,通常会新建一个子线程出来,例如如下代码 new Thread(new Runnable() { @Override public void run() { //耗时代码 } }).start(); 这种方式的线程随处可见,但是这种方式的写法是存在一定问题的,我们知道,在操作系统中,线程是操作系统调度的最小单元,同时线程又不能无限制的产生,并且线程的创建和销毁都会有资源的开销,同时当线程频繁的创建或者销毁的时候,还会让GC

Android(线程二) 线程池详解

我们在ListView中需要下载资源时,赞不考虑缓存机制,那么每一个Item可能都需要开启一个线程去下载资源(如果没有线程池),如果Item很多,那么我们可能就会无限制的一直创建新的线程去执行下载任务,最终结果可能导致,应用卡顿.手机反应迟钝!最坏的结果是,用户直接卸载掉该App.所以,我们在实际开发中需要考虑多线程,多线程就离不开线程池.如果你对线程还不了解,可以看看这篇文章,Android(线程一) 线程. 使用线程池的优点: (1).重用线程,避免线程的创建和销毁带来的性能开销: (2).

四大线程池详解(转载)

new Thread 的弊端 首先看一段代码: public class ThreadTest { public static void main(String[] args) { while (true) { new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread()); } }).start(); } } } Thread[Thread-0,5,main

java线程池详解一

1.为什么要用线程池技术 诸如Web服务器.数据库服务器.文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务.请求以某种方式到达服务器,这种方式可能是通过网络协议(例如 HTTP.FTP 或 POP).通过 JMS 队列或者可能通过轮询数据库.不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的. 构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务.实际上,对于

java - jdk线程池详解

线程池参数详解 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 参数 说明 corePoolSize 表示常驻核心线程数量. maximumPoolS

Java自定义线程池详解

自定义线程池的核心:ThreadPoolExecutor 为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制,其中在java.util.concurrent包下,是JDK并发包的核心,比如我们熟知的Executors.Executors扮演着线程工厂的角色,我们通过它可以创建特定功能的线程池,而这些线程池背后的就是:ThreadPoolExecutor.那么下面我们来具体分析下它. 构造ThreadPoolExecutor public ThreadP

【java线程系列】java线程系列之java线程池详解

一线程池的概念及为何需要线程池: 我们知道当我们自己创建一个线程时如果该线程执行完任务后就进入死亡状态,这样如果我们需要在次使用一个线程时得重新创建一个线程,但是线程的创建是要付出一定的代价的,如果在我们的程序中需要频繁使用线程,且每个线程执行的时间很短,短到几乎小于线程创建及销毁的时间那么代价将会更大,如:服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的.显然如果频繁的创建销毁线程效率将非常低. 那么我们能否让一个线程可以复用,即当一个线程执行完后不销毁该线程,而