ExecutorCompletionService分析及使用

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

方式一:

通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0

 1 public class ExecutorCompletionServiceTest {
 2
 3     static class Task implements Callable<String> {
 4         private int i;
 5
 6         public Task(int i) {
 7             this.i = i;
 8         }
 9
10         @Override
11         public String call() throws Exception {
12             Thread.sleep(10000);
13             return Thread.currentThread().getName() + "执行完任务:" + i;
14         }
15     }
16
17     public static void main(String[] args) {
18         testUseFuture();
19     }
20
21     private static void testUseFuture() {
22         int numThread = 5;
23         ExecutorService executor = Executors.newFixedThreadPool(numThread);
24         List<Future<String>> futureList = new ArrayList<Future<String>>();
25         for (int i = 0; i < numThread; i++) {
26             Future<String> future = executor
27                     .submit(new ExecutorCompletionServiceTest.Task(i));
28             futureList.add(future);
29         }
30
31         while (numThread > 0) {
32             for (Future<String> future : futureList) {
33                 String result = null;
34                 try {
35                     result = future.get(0, TimeUnit.SECONDS);
36                 } catch (InterruptedException e) {
37                     e.printStackTrace();
38                 } catch (ExecutionException e) {
39                     e.printStackTrace();
40                 } catch (TimeoutException e) {
41                     // 超时异常直接忽略
42                 }
43                 if (null != result) {
44                     futureList.remove(future);
45                     numThread--;
46                     System.out.println(result);
47                     // 此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
48                     break;
49                 }
50             }
51         }
52     }
53 }

方式二:

第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

 1 public class ExecutorCompletionServiceTest {
 2
 3     static class Task implements Callable<String> {
 4         private int i;
 5
 6         public Task(int i) {
 7             this.i = i;
 8         }
 9
10         @Override
11         public String call() throws Exception {
12             Thread.sleep(10000);
13             return Thread.currentThread().getName() + "执行完任务:" + i;
14         }
15     }
16
17     public static void main(String[] args) throws InterruptedException, ExecutionException {
18         testExecutorCompletionService();
19     }
20
21     private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{
22         int numThread = 5;
23         ExecutorService executorService = Executors.newFixedThreadPool(numThread);
24         CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
25         for (int i = 0; i < numThread; i++) {
26             completionService.submit(new ExecutorCompletionServiceTest.Task(i));
27         }
28         for (int i = 0; i < numThread; i++) {
29             System.out.println(completionService.take().get());
30         }
31         executorService.shutdown();
32     }
33 }

ExecutorCompletionService实现了CompletionService接口,CompletionService是Executor和BlockingQueue的结合体。可以看下构造函数

1 public ExecutorCompletionService(Executor executor) {
2         if (executor == null)
3             throw new NullPointerException();
4         this.executor = executor;
5         this.aes = (executor instanceof AbstractExecutorService) ?
6             (AbstractExecutorService) executor : null;
7         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
8     }  

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture,

1 public Future<V> submit(Callable<V> task) {
2         if (task == null) throw new NullPointerException();
3         RunnableFuture<V> f = newTaskFor(task);
4         executor.execute(new QueueingFuture(f));
5         return f;
6     }  

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

1 private class QueueingFuture extends FutureTask<Void> {
2         QueueingFuture(RunnableFuture<V> task) {
3             super(task, null);
4             this.task = task;
5         }
6         protected void done() { completionQueue.add(task); }
7         private final Future<V> task;
8     }

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1 public Future<V> take() throws InterruptedException {
2     return completionQueue.take();
3 }
4
5 public Future<V> poll() {
6     return completionQueue.poll();
7 } 
时间: 2024-11-05 12:16:21

ExecutorCompletionService分析及使用的相关文章

Future 和 ExecutorCompletionService 对比和使用

附加:Java 4种线程池介绍请查看 谈谈new Thread的弊端及Java四种线程池的使用 当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取: 方式一: 通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成.如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 public class Completi

获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取: 方式一: 通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成.如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 Java代码 public class CompletionServiceTest { static class Task impleme

JDK源码分析之concurrent包(三) -- Future方式的实现

上一篇我们基于JDK的源码对线程池ThreadPoolExecutor的实现做了分析,本篇来对Executor框架中另一种典型用法Future方式做源码解读.我们知道Future方式实现了带有返回值的程序的异步调用,关于异步调用的场景大家可以自行脑补Ajax的应用(获取返回结果的方式不同,Future是主动询问获取,Ajax是回调函数),这里不做过多说明. 在进入源码前,首先来看下Future方式相关的API: 接口Callable:有返回结果并且可能抛出异常的任务: 接口Future:表示异步

Solr初始化源码分析-Solr初始化与启动

用solr做项目已经有一年有余,但都是使用层面,只是利用solr现有机制,修改参数,然后监控调优,从没有对solr进行源码级别的研究.但是,最近手头的一个项目,让我感觉必须把solrn内部原理和扩展机制弄熟,才能把这个项目做好.今天分享的就是:Solr是如何启动并且初始化的.大家知道,部署solr时,分两部分:一.solr的配置文件.二.solr相关的程序.插件.依赖lucene相关的jar包.日志方面的jar.因此,在研究solr也可以顺着这个思路:加载配置文件.初始化各个core.初始化各个

爱奇艺、优酷、腾讯视频竞品分析报告2016(一)

1 背景 1.1 行业背景 1.1.1 移动端网民规模过半,使用时长份额超PC端 2016年1月22日,中国互联网络信息中心 (CNNIC)发布第37次<中国互联网络发展状况统计报告>,报告显示,网民的上网设备正在向手机端集中,手机成为拉动网民规模增长的主要因素.截至2015年12月,我国手机网民规模达6.20亿,有90.1%的网民通过手机上网. 图 1  2013Q1~2015Q3在线视频移动端和PC端有效使用时长份额对比 根据艾瑞网民行为监测系统iUserTracker及mUserTrac

Tomcat启动分析(我们为什么要配置CATALINA_HOME环境变量)

原文:http://www.cnblogs.com/heshan664754022/archive/2013/03/27/2984357.html Tomcat启动分析(我们为什么要配置CATALINA_HOME环境变量) 用文本编辑工具打开用于启动Tomcat的批处理文件startup.bat,仔细阅读.在这个文件中,首先判断CATALINA_HOME环境变量是否为空,如果为空,就将当前目录设为CATALINA_HOME的值.接着判断当前目录下是否存在bin\catalina.bat,如果文件

C# 最佳工具集合: IDE 、分析、自动化工具等

C#是企业中广泛使用的编程语言,特别是那些依赖微软的程序语言.如果您使用C#构建应用程序,则最有可能使用Visual Studio,并且已经寻找了一些扩展来对您的开发进行管理.但是,这个工具列表可能会改变您编写C#代码的方式. C#编程的最佳工具有以下几类: IDE VS扩展 编译器.编辑器和序列化 反编译和代码转换工具 构建自动化和合并工具 版本控制 测试工具和VS扩展 性能分析 APM 部署自动化 容器 使用上面的链接直接跳转到特定工具,或继续阅读以浏览完整列表.

秒杀系统架构分析与实战

0 系列目录 秒杀系统架构 秒杀系统架构分析与实战 1 秒杀业务分析 正常电子商务流程 (1)查询商品:(2)创建订单:(3)扣减库存:(4)更新订单:(5)付款:(6)卖家发货 秒杀业务的特性 (1)低廉价格:(2)大幅推广:(3)瞬时售空:(4)一般是定时上架:(5)时间短.瞬时并发量高: 2 秒杀技术挑战 假设某网站秒杀活动只推出一件商品,预计会吸引1万人参加活动,也就说最大并发请求数是10000,秒杀系统需要面对的技术挑战有: 对现有网站业务造成冲击 秒杀活动只是网站营销的一个附加活动,

Openfire分析之二:主干程序分析

引言 宇宙大爆炸,于是开始了万物生衍,从一个连人渣都还没有的时代,一步步进化到如今的花花世界. 然而沧海桑田,一百多亿年过去了-. 好复杂,但程序就简单多了,main()函数运行,敲个回车,一行Hello World就出来了,所以没事多敲敲回车,可以练手感-. 一.程序入口 Java的程序入口是main方法,Openfire也不例外.可以全局检索一下"void main",可以看到,Openfire的main函数有两个: (1)org.jivesoftware.openfire.lau