将应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,并提供一种自然的并行工作结构来提升并发性
理想情况下,能找出清晰的任务边界,各个任务之间是相互独立的,任务不依赖于其他任务的状态、结果或边界效应。
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应,而且当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。
串行地执行任务:
public class Demo { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while(true){ Socket connection = socket.accept(); handleRequest(connection); } } }
在web请求的处理中包含了一组不同的运算与I/O操作。服务器必须处理套接字I/O以读取请求和写回响应,这些操作通常会由于网络拥塞或联通性问题而被阻塞。
创建线程执行:以独立的客户请求为边界
public class Demo { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while(true){ final Socket connection = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(connection); } }; new Thread(task).start(); } } }
但是线程的创建与销毁并不是没有代价的。并且活跃的线程会消耗系统资源,尤其是内存,若可运行的线程数量多于可用处理器数量,那么有些线程将会闲置。大量空闲线程会占用许多内存,给垃圾回收期带来压力,而且大量线程在竞争CPU资源时还将产生其他的性能开销,如果已经有足够多的线程使所以cpu保持忙碌状态,那么再创建更多的线程反而会降低性能。
- Executor
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制,在java类库中,任务执行的主要抽象不是Thread,而是Executor
public interface Executor{ void excute(Runnable command); }
虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息搜集、应用程序管理机制和性能检测等机制。
基于Executor的Web服务器
/** * 在TaskExecutionWebServer中,通过Executor,将请求任务的提交与任务的实际执行解耦开来, * 并且只需采用另一种不同的Executor实现,就可以改变服务器的行为。 */ public class TaskExecutionWebServer { private static final int NTHREADS = 100; private static final ExecutorService exec = Executors.newFixedThreadPool(100); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while(true){ final Socket connection = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(connection); } }; exec.execute(task); } } }
我们也可以很容易地改变Executor的行为,如为每一个请求都创建一个新线程:
public class TaskExecutionWebServer implements Executor{ public void excute(Runnable r){ new Thread(r).start(); }; }
也可以类似于单线程的行为,以同步的方式执行每个任务:
public class TaskExecutionWebServer implements Executor{ public void excute(Runnable r){ r.run(); }; }
线程池
线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务,通过重用现有的线程而不是创建新线程。
类库提供了灵活的线程池以及一些有用的默认配置,可以通过调用Executors中的静态方法来创建一个线程池:
- newFixedThreadPool创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
- newCachedThreadPool 创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
- newSingleThreadExecutor 是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建一个线程来替代,它可以确保任务按照队列中的顺序来串行执行。
- newScheduledThreadPool 创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于timer。
通过使用Executor,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。
Executor的生命周期
JVM只有在所有非守护线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。
- ExecutorService扩展了Executor接口,添加了一些用于生命周期管理的方法以及一些用于任务提交的便利方法。
ExecutorService的生命周期有3种状态:运行、关闭和已经终止。
ExecutorService在初始创建时处于运行状态。
shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。
shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的额任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService关闭后提交的任务将由拒绝执行处理器来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。
等所有任务都完成后,ExecutorService转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。
public interface ExecutorService extends Executor{ void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; //... }
- 支持关闭操作的服务器
public class LifecycleWebServer{ private final ExecutorService exec = Executors.newFixedThreadPool(100); public void start() throws IOException{ ServerSocket socket = new ServerSocket(80); while(!exec.isShutdown()){ try{ final Socket conn = socket.accept(); exec.execute(new Runnable(){ public void run(){ handleRequest(conn); } }); }catch(RejectedExecutionException e){ if(!exec.isShutdown()){ log("task submission rejected",e); } } } } public void stop(){ exec.shutdown(); } void handleRequest(Socket connection){ Request req = readRequest(connection); if(isShutdownRequest(req)){ stop(); }else{ dispatchRequest(req); } } }
延迟任务与周期任务
Timer类负责管理延迟或周期任务,然而存在一些缺陷:
Timer在执行所有定时任务时只会创建一个线程。若某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性。
Timer线程并不捕获异常,当TimerTask抛出未检查的异常时将终止定时线程,Timer也不会恢复线程,因此已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。
Timer支持基于绝对时间而不是相对时间的调度机制,因此任务的执行对系统时钟变化很敏感,而ScheduledThreadPoolExecutor只支持基于相对时间的制度。
- 示例:渲染页面
- 1. 将HTML页面绘制到图像缓存中,假设HTML页面只包含标签文本,以及预定义大小的图片和URL。
/** * 最简单的方法是对HTML文档进行串行处理,遇到文本标签时,绘制到缓存中,遇到图像引用时,先通过网络获取它,再绘制到缓存中。 * 这很容易实现,程序只需将输入中的每个元素处理一次,但这种方法用户体验很差,他们必须等待很长时间,直到显示所有文本。 * * 但另一种串行方法会更好一些,它先绘制文本元素,同时为图像预留出矩形占位空间。在处理完文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。 * 图像下载过程的大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作。 * 因此,这种串行方式没有充分的利用CPU,使得用户在看到最终页面之前要等待很长时间。 * 通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU利用率和相应灵敏度。 */ public class SingleThreadRender { void renderpage(CharSequence source){ renderText(source); List<ImageData> imageData = new ArrayList<ImageData>(); for(ImageData imageInfo : scanForImageInfo(source)){ imageData.add(imageInfo.downloadImage()); } for(ImageData data : imageData){ renderImage(data); } } }
- Callable与Future
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
Runnable与Callable描述的都是抽象的任务。这些任务通常是有范围的,即都有一个明确的起始点,且最终会结束。
Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。
由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当他们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成状态上。
public interface Callable<V> { V call() throws Exception; } public interface Future<V>{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException,CancellationException; V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,CancellationException,TimeoutException; }
ExecutorService中所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。
还可以显示地为某个指定的Runnable或Callable实例化一个FutureTask(由于FutureTask实现了Runnable,因此塔可以提交给Executor来执行,或者调用它的run方法)。
Java6开始,ExecutorService实现可以改写AbstractExecutorService中的newTaskFor方法,从而根据已提交的Runnable或Callable来控制Future的实例化过程。
在默认实现中仅创建了一个新的FutureTask:
public class ThreadPoolExecutor { protected <T> RunnableFuture<T> newTaskFor(Callable<T> task){ return new FutureTask<T>(task); } }
在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似的,在设置Future结果的过程中,也需要将这个结果从计算它的线程发布到任何通过get获得它的线程。
- 2. 改进:Future实现
/** * 将渲染过程分解为两个任务,一个是渲染所有的文本(cpu),另一个是下载所有的图像(IO)。 * FutureRenderer使得渲染文本任务与下载图像数据的任务并发执行,当所有图像下载完,会显示到页面上。 * future.get()的调用处理了两个可能的问题:任务遇到一个Exception,或者调用get的线程在获得结果之前被中断。 * * FutureRenderer使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。 * 但如果渲染文本的速度远远高于下载图像的速度,那么程序的最终性能与串行执行时的性能差别不大。 * 只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出程序的工作负载分配到多个任务中带来的真正性能提升。 */ public class FutureRenderer { private final ExecutorService executor = Executors.newFixedThreadPool(1); void renderPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable<List<ImageData>>(){ public List<ImageData> call(){ List<ImageData> result = new ArrayList<ImageData>(); for(ImageInfo imageInfo : imageInfos){ result.add(imageInfo.downloadImage); } return result; } } Future<List<ImageData>> future = executor.submit(task); renderText(source); try{ List<ImageData> imageData = future.get(); for(ImageData data : imageData){ renderImage(data); } }catch(InterruptedException e){ //重新设置线程的中断状态 Thread.currentThread().interrupt(); //由于不需要结果,因此取消任务 future.cancel(); }catch(ExecutionException e){ throw launderThrowable(e.getCause()); } } }
- CompletionService:Executor与BlockingQueue
如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复调用get方法,同时将timeout指定为0,从而通过轮询来判断任务是否完成,这显然会很繁琐。
CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法获得已完成的结果,而这些结果会在完成时封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
利用BlockingQueue的阻塞性来获取任务结果Future
private class QueueingFuture<V> extends FutureTask<V> { QueueingFuture(Callable<V> c){ super(c); } QueueingFuture(Runnable t,V r){ super(t,r); } protected void done(){ completionQueue.add(this); } }
- 3. 改进:CompletionService实现
/** * ExecutorCompletionService的实现很简单,在构造函数中创建一个BlockingQueue来保存计算完成的结果。 * 当计算完成时,调用Future-Task中的done方法。 * 当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法, * 并将结果放入BlockingQueue中,将take和poll方法委托给了BlockingQueue,这些方法会在得出结果之前阻塞。 * * CompletionService从两个方面提高了页面渲染器性能:缩短总运行时间以及提高响应性。 * 为每一个image的下载都创建一个独立任务,并在线程池中执行它们,将串行地下载转换为并行下载过程,这将减少下载所有image的总时间。 * 另外在image完成下载后可以立刻显示出来。使用户获得一个更加动态和更加响应性的用户界面。 */ public class Renderer { private final ExecutorService executor; public Renderer(ExecutorService executor){ this.executor = executor; } public renderPage(CharSequence source){ List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor); for(final ImageInfo imageInfo : info){ completionService.submit(new Callable<ImageData>(){ public ImageData call(){ return imageInfo.downloadImage(); } }); renderText(source); try{ for(int t=0,n=info.size();t<n;t++){ Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); }catch(ExecutionException e){ throw launderThrowable(e.getCause()); } } } }
- 示例 - 限时任务
- 旅行预订门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条线程、旅店或汽车租赁公司的报价。
在获取不同公司报价的过程中,可能会调用web服务、访问数据库、执行一个EDI失误或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的消息。对于没有及时响应的服务提供者,页面可以忽略它们。
可以创建n个任务,将其提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法--invokeAll
invokeAll方法的参数为一组任务,并返回一组Future。这两个集合有着相同的结构。
invokeAll按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable关联起来。
当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定限时(未完成的任务会取消),invokeAll将返回。客户端可以调用get或者isCancelled来判断究竟是何种情况。
private class QuoteTask implements Callable<TravelQuote>{ private final TravelCompany company; private final TravelInfo travelInfo; //... public TravelQuote call() throws Exception{ return company.solicitQuote(travelInfo); } } public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,Comparator<TravelQuote> ranking,long time,TimeUnit unit) throws InterruptedException{ List<QuoteTask> tasks = new ArrayList<QuoteTask>(); for(TravelCompany company : companies){ tasks.add(new QuoteTask(company,travelInfo)); } List<Future<TravelQuote>> futures = exec.invokeAll(tasks,time,unit); List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size()); Iterator<QuoteTask> taskIte = tasks.iterator(); for(Future<TravelQuote> f : futures){ QuoteTask task = taskIte.next(); try{ quotes.add(f.get()); }catch(ExecutionException e){ quotes.add(task.getFailureQuote(e.getCause())); }catch(CancellationException e){ quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); return quotes; }
#笔记内容来自《 java并发编程实战》
原文地址:https://www.cnblogs.com/shanhm1991/p/9899171.html