应用线程池ThreadPoolExecutor

线程池的大小

在配置和调整应用线程池的时候,首先考虑的是线程池的大小。

线程池的合理大小取决于未来提交的任务类型和所部署系统的特征。定制线程池的时候需要避免线程池的长度“过大”或者“过小”这两种极端情况。

线程池过大:那么线程对稀缺的CPU和内存资源的竞争,会导致内存高使用量,还可能耗尽资源。

线程池过小:由于存在很多可用的处理器资源还未工作,会对吞吐量造成损失。

精密的计算出线程池的确切大小是很困难的,一般我们会估算出一个合理的线程池大小。

对于计算密集型任务,一个具有N个处理核心的系统,可以使用N+1个线程的线程池。(这样当某个线程因为错误暂停,刚好有一个线程补上)

对于包含了I/O和其他阻塞操作的任务系统中,不是所有的线程都会在所有的时间被调度,因此就需要一个更大的线程池。你还需估算出任务花在等待的时间与用来计算的时间的比率。

Java中可以通过下面的这句代码获得CPU中处理核心的数量

int CPU_Num = Runtime.getRuntime().availableProcessors();

当然处理核心并不是唯一影响线程池大小的因素,如果线程池中的每个线程都要使用到池化资源(比如数据库连接池),那么数据库连接池的大小在指定线程池大小的时候也必须考虑进去。

配置ThreadPoolExecutor

通常我们会使用工具类Executors中的相关方法(如:newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool)去构建一个线程池,通过查看Executors类的相关源码,我们可以发现,这些方法都去实例化了一个ThreadPoolExectuor对象,只是传入构造方法的参数不一样。ThreadPoolExecutor是继承与抽象类AbstractExecutorService,是ExecutorService的一个实现,其中包含了多个重载的构造方法,用于去构造不同的线程池。

从ThreadPoolExecutor的构造方法中,我们能够得到构造一个线程池需要的一些参数。

  • corePoolSize:线程池中保存的核心线程的数量,即使这些核心线程是空转的。
  • maximumPoolSize:最大线程池的大小
  • keeAliveTime:当线程数大于核心线程数的时候,空闲线程存活的最大时间。
  • TimeUtil:keepAliveTime的时间单位
  • workQueue:执行前用于保存任务的队列(等候队列)。
  • handler:当任务超出最大线程池大小和等候队列的长度的时候,对后继到达任务的处理策略,也被叫做饱和策略。

其中核心线程数(corePoolSize)、最大线程数(maximumPoolSize)和存活时间(keepAliveTime)共同管理着线程的创建和销毁。

当一个ThreadPoolExecutor被初始创建后,所有的核心线程并非立即开始,而是要等到有任务提交的时候,除非你调用prestartAllCoreThreads。

当提交的任务数达到coolPoolSize大小后,之后提交的任务会被保存到workQueue中,而不是创建新的线程去执行它们。当workQueue充满后,就会去创建新的线程,但是总的线程数量不会大于maximumPoolSize。

当前线程数量大于corePoolSize的时候,如果空闲线程等待的时间超过了keepAliveTime那么这个空闲线程就会被销毁,当然如果当前线程数量没有超过corePoolSize,那么这个keepAliveTime是不起作用的。

通过调节核心大小和存活时间,可以促进线程池归还空闲线程占有的资源,让这些资源用于其他有用的工作,当然你必须权衡整个系统任务到来的数量和频率,因为频繁的创建和销毁线程会导致更大的开销。

1、查看Executors是如何通过newFixedThreadPool构建固定大小的线程池的

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

可以看到是直接去创建一个ThreadPoolExecutor实例,其中FixedThreadPool的核心线程池大小同最大线程池大小是一样的,keepAliveTime也被设置为0,也就是永远不会超时,使用LinkedBlockingQueue作为它的等候队列,它是一个无限的队列,如果当前的线程大于nThread,那么后续到达的任务都会保存在LinkedBlockingQueue,直到有可用的线程。如果到达的速度过快而又得不到线程处理,那么可能造成等候队列膨胀,导致内存耗尽。

2、查看Executors是如何通过newCachedThreadPool构建缓存线程池的

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

CachedThreadPool的核心线程池大小为0,最大线程池大小是Integer.MAX_VALUE,也就是无限大,存活的时间60秒,使用的SynchronousQueue作为工作队列,这个队列其实不是一个真正意义上的队列,因为它没有内部空间存放元素,而是一种管理直接在线程之间移交信息的机制,为了把一个任务放入到SynchronousQueue中,必须有另外一个线程要从Synchronous中取任务,这样我们每提交一个任务到CachedThreadPool的时候,ThreadPoolExecutor就会创建一个新的线程去取这个提交的任务来执行。

通过观察上面的两个线程池创建,我们也可以利用ThreadPoolExecutor去创建一个定制的线程池,不过还需要注意以下一些问题。

  • 工作队列(workQueue)选型
  • 饱和策略(handler)
  • 线程工厂的定制(ThreadFactory)

一、工作队列的选型:在前面的两个例子中,可以看到FixedThreadPool选用LinkedBlockingQueue作为工作队列,而CachedThreadPool选用SynchronousQueue作为工作队列,不同类型的工作队列会带来不一样的线程池特性。

使用线程池而不是“每任务每线程”(thread-per-task),一方面是为了方便线程的管理,另一方面是无限制创建线程锁带来的性能问题。在使用线程池的时候,如果新请求到达的频率超过了线程池能够处理它们的速度,请求将在等候队列中等待,但是如果请求速度过快,仍然存在耗尽资源的风险。

ThreadPoolExecutor允许你提供一个BlockingQueue来保存等待执行的任务。选用的队列有以下3中类型

  • 无限队列
  • 有限队列
  • 同步移交队列

其中BlockingQueue接口的实现如下图:

比如newFixedThreadPool和newSingleThreaExecutor就是选用了无限的LinkedBlockingQueue,允许等候任务数量无限制的增长,这在一定程度上不是安全的。一个稳妥的资源管理策略是使用有限队列,比如ArrayBlockingQueue或有限的LinkedBlockingQueue或者PriorityBlockingQueue,来防止任务过快增长,耗尽资源。但是使用有界队列又带来了新的问题,当到达界值的时候,仍然有新的任务源源不断的到来,该怎么办?这就是饱和策略处理的问题。还有一种同步移交队列,比如上述CacheThreadPool中使用的SynchronousQueue,是使用在一个特别庞大的或者无限的线程池之上,这相当于完全绕开了队列,将任务直接交给线程执行,这样做往往具有更好的效率。

使用LinkedBlockingQueue或者ArrayBlockingQueue这种FIFO队列会造成任务以它们到达的顺序执行,这是一种公平的执行策略。如果想控制任务执行的顺序,可以使用优先级阻塞队列PriorityBlockingQueue。

注意:只有当任务彼此独立的时候,才能使有限线程或有限队列的使用合理。倘若任务之间相互依赖,有限线程池或者有限队列会引起线程饥饿死锁。比如,线程池中的线程a执行一个依赖的任务1时,需要任务2的执行结果,而任务2因为没有执行线程正在等候队列中等待,这样正在执行的线程等待等候队列的任务,而等候队列中的任务又等待正在执行的线程结束。使用一个无限线程池可以避免这类问题。

二、饱和策略:任务饱和状态下,对后继任务的处理策略。

当一个有界队列充满后(线程池和等候队列都充满),饱和策略开始发挥作用。我们可以在通过ThreadPoolExecutor构建线程池的时候,将饱和策略传进去,也就是上面所说的参数handler,它是RejectedExecutionHandler类型的,JDK类库中提供了RejectedExecutionHandler接口的几种饱和策略的实现,这些实现类都是作为ThreadPoolExecutor的静态内部类存在的,有以下几种:

  • AbortPolicy:直接抛出RejectedExecutionExecption
  • CallerRunsPolicy:让调用者执行任务
  • DisCardPolicy:直接丢弃后继的任务
  • DiscardOldestPolicy:会丢弃最老的那个任务

默认的“abort”策略会抛出未检查的异常,供调用者捕获后编写自己的处理逻辑。“discard”策略会默默地直接丢弃到达的任务。“discardOldest”策略选择性的丢弃任务,丢弃最老(等候时间最长)的任务,也就是本该接下来执行的任务,然后尝试去重新提交任务。“caller-runs”策略既不会丢弃任务,也不会抛出异常,它会把任务推回到调用者那里,以减轻负担,它不会在线程池中执行最新提交的任务。

在一个无限队列中,我们也可以使用阻塞的方法控制任务向线程池提交的速率,比如,可以根据线程池大小指定一个信号量,来实现控制任务注入率。

补充:关于“caller-runs”策略,如在主线程中不断的向线程池提交任务,当达到饱和时,就会把新提交的任务推回到主线程中去执行,这样主线程就会去执行提交任务,而不在向线程池提交新任务,线程池也就有时间去处理等候任务,而且在一个WebService程序中,主线程在处理推回的提交任务的时候,是不会再接受新的web请求,这样新的请求就无法到达应用程序,在TCP层等候,由TCP层协议去决定后继任务的处理策略,这样一来就把负荷逐渐由应用线程池到主线程到TCP层外移,使得服务器在高负载的情况下可以平缓的劣化。

三、线程工厂的定制: 线程池创建新的线程是通过线程工厂去创建的,ThreadFactory接口只有一个唯一的方法:newThread,用于创建一个新的线程,默认的线程工厂会创建一个新的、非后台的线程。你可以通过实现ThreadFactory接口去实现自定义的线程工厂。

扩展ThreadPoolExecutor

决定ThreadPoolExecutor是可以扩展的,它提供了一些未实现的钩子方法让子类去实现,有beforeExecute、afterExecute、和terminate。执行任务的线程会调用这些方法,用它们去添加日志、时序、监视器或者统计信息的收集。就好像是通过AOP去实现一些切面逻辑一样,ThreadPoolExecutor让我们自己去实现已经提供的这些切面。

下面的这段代码显示了一个定制的线程池,它通过使用beforeExecute、afterExecute、terminated方法,加入了线程池执行任务过程中的日志和统计收集功能。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

public class TimingThreadPool extends ThreadPoolExecutor{

	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
	private final Logger log = Logger.getLogger("TimingThreadPool");
	private final AtomicLong numTasks = new AtomicLong();
	private final AtomicLong totalTime = new AtomicLong();

	public TimingThreadPool(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
		// TODO Auto-generated constructor stub
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		// TODO Auto-generated method stub
		super.beforeExecute(t, r);
		String str = String.format("Thread %s: start %s", t, r);
//		System.out.println(str);
		log.fine(str);
		startTime.set(System.nanoTime());
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		// TODO Auto-generated method stub
		try {
			long endTime = System.nanoTime();
			long taskTime = endTime - startTime.get();
			numTasks.incrementAndGet();
			totalTime.addAndGet(taskTime);
			String str = String.format("Thread %s : end %s, time=%dns", t, r, taskTime);
//			System.out.println(str);
			log.fine(str);
		} finally {
			super.afterExecute(r, t);
		}
	}

	@Override
	protected void terminated() {
		// TODO Auto-generated method stub
		try {
			String str = String.format("Terminated : avg time = %dns", totalTime.get() / numTasks.get());
//			System.out.println(str);
			log.info(str);
		}finally {
			super.terminated();
		}
	}

	public Logger getLog() {
		return log;
	}

}

这样线程池执行过程中的日志信息就会被加入到该线程池的Logger中。

时间: 2024-08-09 02:09:40

应用线程池ThreadPoolExecutor的相关文章

java线程API学习 线程池ThreadPoolExecutor(转)

线程池ThreadPoolExecutor继承自ExecutorService.是jdk1.5加入的新特性,将提交执行的任务在内部线程池中的可用线程中执行. 构造函数 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, Rejected

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

线程池ThreadPoolExecutor使用简介(转)

一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量 maximumPool

[转载]线程池ThreadPoolExecutor使用简介

一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量

线程池(ThreadPoolExecutor JDK1.7)

平常我们经常都会使用到线程池,但是有没考虑过为什么需要使用线程池呢?下面我列举一下问题,大家可以思考一下 1.当前服务器的硬件环境是多少核的CPU,它和线程的关系又是什么? 2.jvm能创建多少个线程? 3.多线程主要解决什么问题? 4.你使用线程池的目的是什么? 以上几个问题都是帮助你更好的使用java的线程(还可以衍生更多的小问题,如:jvm维护线程的消耗,cpu调度线程的消耗,应该使用多少个线程才能最大化利用多核CPU..).答案需要自己去百度,我这也讲不好,反而误导大家. 线程池顾名思义

Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理 以下是本文的目录大纲: 一.shutdown()  --  温柔的终止线程池 interruptIdleWorkers()  --  中断空闲worker tryTerminate()  --  尝试终止线程池 二.shutdown

java 线程池 ---- ThreadPoolExecutor 类

执行流程 1, 创建线程池后, 默认不会创建线程, 等到有任务带来才创建线程, 即一个线程处理一个任务 2, 当线程数量达到核心线程数时, 任务放进队列, 如果放入队列失败, 创建新线程处理任务(此时线程池线程数大于核心线程数) 3, 如果线程数大于最大线程数, 执行拒绝策略处理任务 构造方法 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, Block

java面试总躲不过的并发(一): 线程池ThreadPoolExecutor基础梳理

本文核心:线程池ThreadPoolExecutor基础梳理 一.实现多线程的方式 1.继承Thread类,重写其run方法 2.实现Runnable接口,实现run方法 3.实现Callable接口,实现call方法 由于Java的设计,只支持单继承,但是支持多实现形式,所以一般面向接口开发,Runnable接口与Callable接口的区别在于Callable接口中的call方法是带返回值的,其返回一个Future的异步类,我们可以通过Future的get方法获取结果,如果线程还没有执行完,g

如何获取线程池ThreadPoolExecutor正在运行的线程

如何获取线程池ThreadPoolExecutor正在运行的线程?这里有两种方法,如下代码: package com.itbac.thread; import java.util.HashSet; import java.util.Set; import java.util.concurrent.*; import java.util.stream.Stream; /** * ThreadPoolExecutor 的 beforeExecute() 和 afterExecute()方法, * 不

并发编程之线程池ThreadPoolExecutor

前言 在我们平时自己写线程的测试demo时,一般都是用new Thread的方式来创建线程.但是,我们知道创建线程对象,就会在内存中开辟空间,而线程中的任务执行完毕之后,就会销毁. 单个线程的话还好,如果线程的并发数量上来之后,就会频繁的创建和销毁对象.这样,势必会消耗大量的系统资源,进而影响执行效率. 所以,线程池就应运而生. 线程池ThreadPoolExecutor 可以通过idea先看下线程池的类图,了解一下它的继承关系和大概结构. 它继承自AbstractExecutorService