线程池的使用
8.1 在任务与执行策略之间的隐性耦合
虽然Executor框架为制定和修改执行策略提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。有些类型的任务需要明确地执行执行策略,包括:
1、 依赖性任务:当线程池中运行任务都是独立的时,我们可以随意地修改池的长度与配置,这不会影响到性能以外的任何事情。但如果你提交到线程池中的任务依赖于其他的任务,这就会隐式地给执行策略带来了约束。
2、 非安全性任务:如果即使一个任务有线程安全性问题,只要它在单线程的环境下运行是不会有问题,如将它提交到Executors.newSingleThreadExecutor,这是安全的。但是一旦提交到线程池时,就会失去线程安全。
3、 对响应时间敏感的任务:将一个长时间运行的任务提交到单线程化的Executor中,或者将多个长时间运行的任务提交给一个只包含少量线程的线程池中,会削弱由Executor管理的服务响应性。
4、 使用ThreadLocal的任务: Executor会随意重用池中的线程。标准的Executor实现是:在空闲时会回收线程,在忙时会增加新的线程,如果任务抛出异常,就会用一个全新的工作者线程取代出错的那个线程。只有当线程本地变量的生命周期被限制在某个任务线程中时,在池的某线程中使用ThreadLocal变量才有意义;不应在线程池的线程中使用ThreadLocal变量在线程内传递值。
只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“阻塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。幸运的是,在基于网络的典型服务器应用程序中——网页服务器、邮件服务器以及文件服务器,它们的请求通常都是同类型的并且相互独立的。
在一些任务中,需要拥有或排除某种特定的执行策略。如果某些任务依赖于其他任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员不会由于使用了某种不合适的执行策略而破坏安全性或活跃性。、
8.1.1 线程饥饿死锁
在线程池中如果一个任务依赖于其他任务的执行,就可能产生死锁。对于一个单线程化的Executor,如果一个任务中将另一个任务提交到相同的Executor中,并等待新提交的任务的结果,这总会引发死锁。第二个任务停留在工作队列中,直到第一个任务完成,但是第一个任务不会完成,因数它在等待第二个任务的完成。在一个大的线程池中,如果所有正在执行任务的线程因等待同一工作队列中的其他任务,也要能会发生同样的问题,这被称作线程饥饿死锁,满足以下叙述就会发生:只要线程池中的任务需要无限地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务返回值或执行结果,那么除非线程池无限大,都则将发生线程 饥饿死锁。
每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程“饥饿”死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。
除了线程池大小上的限制外,还可能由于其他资源上的约束而存在一些隐式限制。如果应用程序使用一个包含10个连接的JDBC连接池,并且每个任务需要一个数据库连接,那么线程池就好像只有10个线程,因为超过10个任务时,新的任务需要等待其他任务释放连接。
8.1.2 运行时间较长的任务
如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。执行时间较长的任务不仅会造成线程池阻塞,甚至还会增加执行时间较短任务的服务时间。如果线程池中的线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性。
有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限地等待。在平台类的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等。如果等待超时,那么可以吧任务标示为失败,然后中止任务或者将任务重新返回队列中以便随后执行。这样,无论任务的最终结果是否成功,这种方法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。如果在线程池中总是充满了被阻塞的任务,那么也可能表示线程池的规模过小。
8.2 设置线程池的大小
线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.avaliableProcessors来动态计算。
要设置线程池的大小并不困难,只需要避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐量。
要想正确地计算线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU?多大得内存?任务是计算密集型、I/O密集型哈市二者皆可?它们是否需要像JDBC连接这样的稀缺资源?如果需要执行不同类型的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费。)对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值。这种时间不需要很精确,并且可以通过一些分析或监控工具来获得。你还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平。
给定下列定义:
N(cpu)=CPU的数量=Runtime.getRuntime().availableProcessors();
U(cpu)= 期望CPU的使用率,0<=U(cpu)<=1
W/C=等待时间与运行时间的比率
为保持CPU达到期望的使用率,最优的池的大小等于:
N(threads)=N(cpu)*U(cpu)*(1+W/C)
CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的约束条件是更容易的:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。
当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会互相影响。如果每个任务都需要一个数据库连接,那么连接池的大小就限制了线程池的大小。同样,当线程池中的任务是数据库连接的唯一使用者时,那么线程池的大小又将限制连接池的大小。
8.3 配置ThreadPoolExecutor
ThreadPoolExecutor为一些Exectors提供了基本的实现,这些线程是由Executors中的工厂newCachedThreadPool、newFixedThreadPool、newScheduledThreadExecutor返回的。
如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors的源代码来了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。ThreadPoolExecutor定义了很多构造函数,以下是参数最完整的构造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
8.3.1 线程的创建与销毁
线程池的基本大小corePoolSize(池中所保留的线程数量,即使是空闲线程)、最大大小maximumPoolSize(池中允许最大的线程数量)、存活时间keepAliveTime(当线程数大于corePoolSize时,允许多余的空闲线程所等待新任务的最长时间,如果超过个时间将会被停止掉)等因素共同负责线程的创建与销毁。
基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小(在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads),并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
【开发人员以免有时会将线程池的基本大小设置为零,从而最终销毁工作者线程以免阻碍JVM的退出。然而,如果在线程池中没有使用SynchronousQueue作为其工作队列(例如在newCachedThreadPool中就是如此,它的核心池设为0,但它的任务队列使用的是SynchronousQueue),那么这种方式将产生一些奇怪的行为。如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor才会创建新的线程。因此,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,而这种行为通常不是我们所希望的。在Java6中,可以通过allowCoreThreadTimeOut来使线程池中的所有线程超时。对于一个大小有限的线程池并且在该线程池中包含了一个工作队列,如果希望和这个线程池在没有任务的情况下能销毁所有的线程,那么可以启用这个特性并将基本大小设置为零。】
可以通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。(显然,这是一种折衷:回收空闲线程会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。)
newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool工厂方法将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时时间设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显式地ThreadPoolExecutor构造函数来构造。
8.3.2 管理队列任务
在有限的线程池中会限制可并发执行的任务数量。
如果无限制地创建线程,那么将导致不稳定,并通过采用固定大小的线程池(而不是每收到一个请求就创建一个新线程)来解决这个问题。然而,这个方案并不完整。在高负载情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累计起来。在线程池中,这些请求会在一个由Executor管理的Runnable队列中等待,而不会像线程那样去竞争CPU资源。通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能会耗尽资源。
即使请求的平均到达速率很稳定,也仍然会出现请求突增的情况。尽管队列有助于缓解任务的突增问题,但如果任务持续高速地到来,那么最终还是会抑制请求的到达率以避免耗尽内存。甚至在耗尽内存之前,响应性也将随着任务队列的增长而变得越来越糟。
ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法由3种:无界队列、有界队列和同步移交。队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool和newSingleThreadPool在默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略可以解决这个问题。)在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界的活着可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue。
当使用像LinkedBlockingQueue或ArrayBlockingQueue这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或Comparator(如果任务实现了Comparable)来定义的。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用和程序中,如果不进行限制,那么很容易发生过载问题。
只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有节的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。(如果一个任务提交另一任务并等待它的结果,对于这种情况,还有一种可选的配置策略:使用一个受限的线程池,工作队列选用SynchronousQueue,饱和策略选择“调用者运行”策略)。
8.3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRectedExecutionHandler来修改。(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略。)JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
“中止(Abort)“策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务无法保存到队列中等待执行时”,“抛弃”策略会悄悄抛弃该任务。“抛弃最旧的”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先队列放在一起使用。)
“调用者运行”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。
我们可以将WebServer示例修改为使用有界队列和“调用者运行”饱和策略,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中运行。由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终到达客户端,导致服务器在高负载下实现一种平缓的性能降低。
当创建Executor时,可以选择饱和策略或者对饱和策略进行修改,如下给出了创建一个固定大小的线程池,同时使用“调用者运行”饱和策略。
ThreadPoolExecutor executor
= new ThreadPoolExecutor(N_THREADS, N_THREADS,
0L, TimeUnit.MILLISENCONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
当工作队列满后,没有预定义的饱和策略来阻塞execute。然而,使用Semaphore(信号量)来限制任务的到达率,就可以实现这个功能,如下给出了这种方法。该方法使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和等待执行的任务数量。
@ThreadSafe
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
8.3.4 线程工厂
每当线程池需要创建一个线程时,都是通过工厂方法来完成的。这个工厂接口如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory中只定义了一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。
在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池中的线程指定一个UncaghtExceptionHandler,或者实例化一个定制的Thread类用于执行调试信息的记录。你还可能希望修改线程的优先级或者守护状态(通常不是一个好主意)。或者你只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。
由工厂创建出来的线程为工作者线程,它是以ThreadPoolExecutor.Worker为基础来创建的,我们所提交的任务最后由Worker来执行,而且每个Worker对象反过来又持有线程工厂创建出来的工作线程,具体实现:由ThreadFactory.newThread(Runnable runnable)来创建一个工作线程,参数runnable就是ThreadPoolExecutor.Worker工作者,而Worker是一个实现了Runnable的类,它的run方法实现是在循环中不停的从队列中取任务然后执行任务,下面从提交一个任务所涉及到的方法,ExecutorService.submit(Runnabel)方法会调用execute方法,以下是ThreadPoolExecutor中的方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果提交任务时,线程池已满,且线程池处于运行状态时,则将任务存放到任务队列中稍后运行任务
if (runState == RUNNING && workQueue.offer(command)) {
//如果线程现在的状态不为运行(虽然外层条件语句已经要求是运行状态,但这里没有使用同步,所以程序运行到这里状态可能已经被修改了,所以需重新判断),或者线程池中没有工作线程了时
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
elseif (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
//如果在线程池处于非运行状态的情况下将任务存入了任务队列的情况,直接拒绝(由于ThreadPoolExecutor这些提交任务的方法未使用同步,只是做了简单判断,所以当程序运行到这里时,该任务很有可能已经在execute方法里就已存入任务队列了:workQueue.offer(command),但入队前基他线程已修改线程池状态)
if (state != RUNNING && workQueue.remove(command))
reject = true;
//如果线程池不是停止或终止状态,线程池未满或空,且任务队列不为空时,则重新创建一个工作线程去执行还未执行完的任务
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command);
elseif (t != null)
t.start();
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果线程池未满,且是运行状态时,新创建一个工作线程
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
returnfalse;
t.start();
returntrue;
}
private Thread addThread(Runnable firstTask) {
//Worker实现了Runnable接口,且持有任务
Worker w = new Worker(firstTask);
//基于Worker创建一个新的工作线程
Thread t = threadFactory.newThread(w);
if (t != null) {
//Worker又反过来持有工作线程
w.thread = t;
//将工作者(线程)放入池中(HashSet<Worker>)
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
下面是ThreadPoolExecutor的内部类Worker的相关方法:
/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//不停的从任务队列中获取任务并执行
while (task != null || (task = getTask()) != null) {
runTask(task);//执行任务,直接调用任务接口(Ruunable,如果是Callable,则在执行前会转换成FutureTask接口)的run方法
task = null;
}
} finally {
workerDone(this);
}
}
//为正在运行的工作线程从任务队列中获取下一个任务
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
returnnull;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();//从队列中阻塞获取任务
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();//中断所有其他工作线程
returnnull;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
下面自己定义一个线程工厂,它创建了一个新的MyAppThread实例,并将一个特定于线程池的名字传递给MyAppThread的构造函数,从而可以在线程转储日志信息中区分来自不同线程池的线程。在应用程序中的其他地方也可以使用MyAppThread,以便所有线程都能使用它的调试功能。
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;//线程池名称
}
public Thread newThread(Runnable runnable) {//参数为工作者Worker
return new MyAppThread(runnable, poolName);
}
}
在MyAppThread中还可以定制其他行为,包括:为线程制定名字,设置自定义UncaghtExceptionHandler向Logger中写入信息,维护一些统计信息(包括由多少个线程被创建和销毁),以及在线程被创建或者终止时把调试信息写入日志。
public class MyAppThread extends Thread {//扩展Thread,该线程将成为池中的工作线程
public static final String DEFAULT_NAME = "MyAppThread";//线程池默认名称
private static volatile boolean debugLifecycle = false;//是否需要记录日志
private static final AtomicInteger created = new AtomicInteger();//已创建线程数
private static final AtomicInteger alive = new AtomicInteger();//正在运行的线程数
private static final Logger log = Logger.getAnonymousLogger();//匿名日志记录器
public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); }
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(//未捕获异常处理器
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,//如果出现异常记录异常日志
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// 从主存中复制debug,确保它的值在这个方法里是一致的
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created "+getName());
try {
alive.incrementAndGet();//正在运行的线程数量加1
super.run();//调用ThreadPoolExecutor.Worker的run方法
} finally {
alive.decrementAndGet();//运行完后减1
if (debug) log.log(Level.FINE, "Exiting "+getName());
}
}
public static int getThreadsCreated() { return created.get(); }
public static int getThreadsAlive() { return alive.get(); }
public static boolean getDebug() { return debugLifecycle; }
public static void setDebug(boolean b) { debugLifecycle = b; }
}
如果在应用和程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过Executor中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程时调用execute或submit的客户端代码中继承访问权限,从而导致令人困惑的安全性异常。
8.3.5 在调用构造函数后再定制ThreadPoolExecutor
在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器)。如果Executor是通过Executors中的某个(newSingleThreadExecutor除外)工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor以访问设置器,如下所示:
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
throw new AssertionError("Oops, bad assumption");
在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。虽然单线程的Executor实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor的线程池大小,那么将破坏它的执行语义。
你可以在自己的Executor中使用这项技术防止执行策略被修改。如果将ExecutorService暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService来包装它。
8.4 扩展ThreadPoolExecutor
ThreadPoolExecutor是可扩展的,它提供了几个“钩子”方法可以在子类化中改写:beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出了一个RuntimeException,那么任务将不会被执行,并且afterExecute也不会被调用。
执行任务的线程会调用钩子函数beforeExecute, afterExecute,它们会在ThreadPoolExecutor.Worker.runTask中被调用,以下是Worker.runTask方法片断:
beforeExecute(thread, task);//执行任务前调用beforeExecute钩子。注,如果该方法抛出一个RuntimeException,任务将不被执行,afterExecute钩子也不会被执行。
try {
task.run();//执行任务
ran = true;
afterExecute(task, null);//任务执行完成后调用afterExecute钩子
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);//如果任务执行失败,则也会调用afterExecute钩子,注,如果抛出的是Error将不会执行。
throw ex;
}
在线程池完成关闭操作时调用terminated,也就是所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor在其声明周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。
示例:给线程池添加统计信息
public class TimingThreadPool extends ThreadPoolExecutor {
//开始时间必须是ThreadLocal型的变量,因为这个变量是记录每个线程执行任务时间
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
//在执行任务前调用,如果抛出异常,则不会再执行任务与后面的afterExecute
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());//每个任务的开始时间
}
//在调用任务后调用,不管任务是成功还是失败都会执行
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();//每个任务的结束时间
numTasks.incrementAndGet();//已完成的任务数
totalTime.addAndGet(taskTime);//任务执行累计时间
log.fine(String.format("Thread %s: end %s, time=%dns", t, r,
taskTime));
} finally {
super.afterExecute(r, t);
}
}
//池关闭时调用
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns", totalTime.get()
/ numTasks.get()));//任务的平均时间
} finally {
super.terminated();
}
}
}
8.5 递归算法的并行化
6.3节的页面绘制程序进行了一系列的改进以便不断发掘可利用的并行性。第一次是使程序完全串行,第二次虽然使用了两个线程,但仍然是串行地下载所有图像,在最后一次实现中将每个图像的下载操作视为一个独立任务,从而实现了更高的并行性。如果在循环体中包含了一些密集计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代都是独立的,都可以对其进行并行化。
如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转化为并行循环:
void processSequentially(List<Element> elements) {//转换前
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {//转换后
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() { process(e); }//耗时处理
});
}
如果需要提交一个任务集并等待它们完成,那么可以使用ExecutorService.invokeAll,并且在所有任务都执行完成后调用CompletionService来获取结果,就像《示例:使用CompletionService的页面渲染器》中的Renderer。
在一些递归设计中同样可以采用循环并行化的方法。在递归算法中通常都会存在串行循环,而且这些循环可以按照上述方式进行并行化。一种简单的情况是:在每个迭代操作中都不需要来自于后续递归迭代的结果。下面是以深度优先遍历一棵树,并在每个节点上执行计算,把结果放入一个容器:
public<T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {//顺序深度遍历
for (Node<T> n : nodes) {
results.add(n.compute());//串行计算
sequentialRecursive(n.getChildren(), results);//递归
}
}
public<T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {//顺序深度遍历
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());//并行计算
}
});
parallelRecursive(exec, n.getChildren(), results); //递归
}
}
当parallelRecursive返回时,书中的各个节点都已经访问过了(但是遍历过程仍然是串行的,只有compute调用才是并行执行的),并且每个节点的计算任务也已经放入Executor的工作队列。parallelRecursive的调用者可以通过以下方式等待所有的结果:创建一个特定于遍历过程的Executor,并使用shutdown和awaitTermination等方法,等待上面并行运行的结果:
public<T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();//计算完后关闭池
//虽然设置了超时,但时间为Long.MAX_VALUE,所以这里一直阻塞到所有并行计算都完成
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
小结
对于并发执行的任务,Executor框架是一种强大且灵活的框架。它提供了大量可调节的选项,例如创建线程和关闭线程的策略,处理队列任务的策略,处理过多任务的策略,并且提供了几个钩子方法来扩展它的行为。然而,与大多数功能强大的框架一样,其中有些设置参数并不能很好地工作,某些类型的任务需要特定的执行策略,而一些参数组合则可能产生奇怪的结果。
参考:Java并发编程
版权声明:本文为博主原创文章,未经博主允许不得转载。