获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

方式一:

通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0

Java代码

  1. public class CompletionServiceTest {
  2. static class Task implements Callable<String>{
  3. private int i;
  4. public Task(int i){
  5. this.i = i;
  6. }
  7. @Override
  8. public String call() throws Exception {
  9. Thread.sleep(10000);
  10. return Thread.currentThread().getName() + "执行完任务:" + i;
  11. }
  12. }
  13. public static void main(String[] args){
  14. testUseFuture();
  15. }
  16. private static void testUseFuture(){
  17. int numThread = 5;
  18. ExecutorService executor = Executors.newFixedThreadPool(numThread);
  19. List<Future<String>> futureList = new ArrayList<Future<String>>();
  20. for(int i = 0;i<numThread;i++ ){
  21. Future<String> future = executor.submit(new CompletionServiceTest.Task(i));
  22. futureList.add(future);
  23. }
  24. while(numThread > 0){
  25. for(Future<String> future : futureList){
  26. String result = null;
  27. try {
  28. result = future.get(0, TimeUnit.SECONDS);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. } catch (ExecutionException e) {
  32. e.printStackTrace();
  33. } catch (TimeoutException e) {
  34. //超时异常直接忽略
  35. }
  36. if(null != result){
  37. futureList.remove(future);
  38. numThread--;
  39. System.out.println(result);
  40. //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
  41. break;
  42. }
  43. }
  44. }
  45. }
  46. }

方式二:

第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

Java代码

  1. public class CompletionServiceTest {
  2. static class Task implements Callable<String>{
  3. private int i;
  4. public Task(int i){
  5. this.i = i;
  6. }
  7. @Override
  8. public String call() throws Exception {
  9. Thread.sleep(10000);
  10. return Thread.currentThread().getName() + "执行完任务:" + i;
  11. }
  12. }
  13. public static void main(String[] args) throws InterruptedException, ExecutionException{
  14. testExecutorCompletionService();
  15. }
  16. private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{
  17. int numThread = 5;
  18. ExecutorService executor = Executors.newFixedThreadPool(numThread);
  19. CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
  20. for(int i = 0;i<numThread;i++ ){
  21. completionService.submit(new CompletionServiceTest.Task(i));
  22. }
  23. }
  24. for(int i = 0;i<numThread;i++ ){
  25. System.out.println(completionService.take().get());
  26. }
  27. }

ExecutorCompletionService分析:

CompletionService是Executor和BlockingQueue的结合体。

Java代码

  1. public ExecutorCompletionService(Executor executor) {
  2. if (executor == null)
  3. throw new NullPointerException();
  4. this.executor = executor;
  5. this.aes = (executor instanceof AbstractExecutorService) ?
  6. (AbstractExecutorService) executor : null;
  7. this.completionQueue = new LinkedBlockingQueue<Future<V>>();
  8. }

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture,

Java代码

  1. public Future<V> submit(Callable<V> task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<V> f = newTaskFor(task);
  4. executor.execute(new QueueingFuture(f));
  5. return f;
  6. }

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

Java代码

  1. private class QueueingFuture extends FutureTask<Void> {
  2. QueueingFuture(RunnableFuture<V> task) {
  3. super(task, null);
  4. this.task = task;
  5. }
  6. protected void done() { completionQueue.add(task); }
  7. private final Future<V> task;
  8. }

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

Java代码

  1. public Future<V> take() throws InterruptedException {
  2. return completionQueue.take();
  3. }
  4. public Future<V> poll() {
  5. return completionQueue.poll();
  6. }
  7. 原文:http://xw-z1985.iteye.com/blog/1997077
时间: 2024-10-12 22:19:19

获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用的相关文章

19、Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对

Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态.然后等待消费者消费了商品,然后消费者通知生产者队列有空间了.同样地,当

python并发编程:多线程-开启线程的两种方式

一 threading模块介绍 multiprocess模块完全模仿了threading模块的接口,二者在使用层面,有很大的相似性 二 开启线程的两种方式 方式一 from threading import Thread import time def sayhi(name): time.sleep(2) print("%s say hello" % name) if __name__ == '__main__': t = Thread(target=sayhi, args=('mik

网络协议 finally{ return问题 注入问题 jdbc注册驱动问题 PreparedStatement 连接池目的 1.2.1DBCP连接池 C3P0连接池 MYSQL两种方式进行实物管理 JDBC事务 DBUtils事务 ThreadLocal 事务特性 并发访问 隔离级别

1.1.1 API详解:注册驱动 DriverManager.registerDriver(new com.mysql.jdbc.Driver());不建议使用 原因有2个: >导致驱动被注册2次. >强烈依赖数据库的驱动jar 解决办法: Class.forName("com.mysql.jdbc.Driver"); 1.1.2 API详解:java.sql.Statement接口: 操作sql语句,并返回相应结果 String sql = "某SQL语句&qu

Android提交数据到服务器的两种方式四种方法

Android应用开发中,会经常要提交数据到服务器和从服务器得到数据,本文主要是给出了利用http协议采用HttpClient方式向服务器提交数据的方法. /** * @author Dylan * 本类封装了Android中向web服务器提交数据的两种方式四种方法 */ public class SubmitDataByHttpClientAndOrdinaryWay { /** * 使用get请求以普通方式提交数据 * @param map 传递进来的数据,以map的形式进行了封装 * @p

【REACT NATIVE 系列教程之十三】利用LISTVIEW与TEXTINPUT制作聊天/对话框&&获取组件实例常用的两种方式

本站文章均为 李华明Himi 原创,转载务必在明显处注明: 转载自[黑米GameDev街区] 原文链接: http://www.himigame.com/react-native/2346.html 本篇Himi来利用ListView和TextInput这两种组件实现对话.聊天框. 首先需要准备的有几点:(组件的学习就不赘述了,简单且官方有文档) 1. 学习下 ListView: 官方示例:http://reactnative.cn/docs/0.27/tutorial.html#content

Java并发编程-创建线程的两种方式及区别

转载请注明:http://blog.csdn.net/UniKylin/article/details/45016117 1.线程和进程的区别 并行:是多个任务在同一时间同时执行,例如多核计算机同时计算的任务可以理解为并行 并发:从微观上看是多个任务抢占一个CPU从而执行自己的任务,轮流执行任务,但是如果遇到资源冲突的时候并没有从根本提高执行效率.但是提高了CPU的使用效率. 前段时间在GitHub上的一幅图可以很好的阐述上面的概念非常形象 2.Java中创建线程的两种方式 1.第一种方式:直接

Simics虚拟机Solaris 8操作系统获取host 系统win7上的文件的两种方式

1 介绍 本文基于的环境设置如下: ? 宿主操作系统:Windows 7 Ultimate ? 寄生操作系统:Solaris 8 SPARC (SunOS 5.8) ? 虚拟环境:Simics 3.0.4 本文假定已在Simics 上安装好Solaris 8 SPARC 操作系统. 动机:一个Unix下可以运行的二进制文件GraphGen,在单独的一台装有Ubuntu的电脑上不能运行,因为该电脑的硬件架构是基于X86的,而GraphGen是SPARC架构下才能运行的程序:在我的笔记本Win7系统

JavaWeb后台从input表单获取文本值的两种方式

JavaWeb后台从input表单获取文本值的两种方式 #### index.html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <input type="text" name="n