大多数并发应用程序都是围绕“任务执行(Task Execution)”来构造的:任务通常是一些抽象的且离散的工作单元。
在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量线程时:
- 线程生命周期的开销非常高。线程的创建与销毁并不是没有代价的。
- 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
- 稳定性。在可创建线程的数量上存在一个限制。
在一定范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建更多的线程只会降低程序的执行速度,并且如果过多地创建一个线程,那么整个应用程序将崩溃,要想避免这种危险,就应该对应用程序可以创建的线程数量进行限制,并且全面的测试应用程序,从而确保在线程数量达到限制时,程序也不会耗尽资源。
在Java类库中,任务执行的主要抽象不是Thread,而是Executor。虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生产待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor。
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);public static void main(String[] args) throws IOException{
ServerSocket serverSocket = new ServerSocket(80);
while (true){
final Socket connection = serverSocket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
}
线程池,从字面啊啊含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括哪些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
public class Renderer {
private final ExecutorService executorService;Renderer(ExecutorService executorService){
this.executorService = executorService;
}void renderPage(CharSequence source){
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executorService);
for (final ImageInfo imageInfo:info){
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}
renderText(source);try{
for (int i=0;i<info.size();i++){
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e){
e.printStackTrace();
} catch (ExecutionException e){
e.printStackTrace();
}}
}
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。例如,某个web应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会降低站点的响应性能。在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在执行时间内没有计算出结果,那么将抛出TimeoutException。
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo,
Set<TravelCompany> companies,
Comparator<TravelQuote> ranking,long time,
TimeUnit unit)throws InterruptedException{
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company:companies){
tasks.add(new QuoteTask(company,travelInfo));
}List<Future<TravelQuote>> futures = executorService.invokeAll(tasks,time,unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIterator = tasks.iterator();for (Future<TravelQuote> f:futures){
QuoteTask task = taskIterator.next();
try {
quotes.add(f.get());
} catch (ExecutionException e){
quotes.add(task.getFailureQuote(e.getCause()));
}catch (CancellationException e){
quotes.add(task.getTimeoutQuote(e));
}
}Collections.sort(quotes,ranking);
return quotes;
}
任务执行