JDK并发工具包CompletionService和ExecutorCompletionService的好处和使用场景

《Java并发编程实践》一书6.3.5节CompletionService:Executor和BlockingQueue,有这样一段话:

"如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。"

这是什么意思呢?我们通过一个例子,分别使用繁琐的做法和CompletionService来完成,清晰的对比能让我们更好的理解上面的一段话和CompletionService这个API提供的初衷。考虑这样的场景,有5个Callable任务分别返回5个整数,然后我们在main方法中按照各个任务完成的先后顺序,在控制台打印返回结果。

package net.aty.completeservice;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class ReturnAfterSleepCallable implements Callable<Integer>
{
	private int sleepSeconds;

	private int returnValue;

	public ReturnAfterSleepCallable(int sleepSeconds, int returnValue)
	{
		this.sleepSeconds = sleepSeconds;
		this.returnValue = returnValue;
	}

	@Override
	public Integer call() throws Exception
	{
		System.out.println("begin to execute.");

		TimeUnit.SECONDS.sleep(sleepSeconds);

		System.out.println("end to execute.");

		return returnValue;
	}
}

这个任务会接受2个参数,睡眠指定的时间后,返回指定的结果。睡眠时间越短,意味着任务越先执行完成。

1.繁琐的做法

通过一个List来保存每个任务返回的Future,然后轮询这些Future,直到每个Future都已完成。我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0。

package net.aty.completeservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TraditionalTest
{
	public static void main(String[] args)
	{
		int taskSize = 5;

		ExecutorService executor = Executors.newFixedThreadPool(taskSize);

		List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();

		for (int i = 1; i <= taskSize; i++)
		{
			int sleep = taskSize - i; // 睡眠时间

			int value = i; // 返回结果

			// 向线程池提交任务
			Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value));

			// 保留每个任务的Future
			futureList.add(future);
		}

		// 轮询,获取完成任务的返回结果
		while(taskSize > 0)
		{
			for (Future<Integer> future : futureList)
			{
				Integer result = null;

				try
				{
					result = future.get(0, TimeUnit.SECONDS);
				} catch (InterruptedException e)
				{
					e.printStackTrace();
				} catch (ExecutionException e)
				{
					e.printStackTrace();
				} catch (TimeoutException e)
				{
					// 超时异常需要忽略,因为我们设置了等待时间为0,只要任务没有完成,就会报该异常
				}

				// 任务已经完成
				if(result != null)
				{
					System.out.println("result=" + result);

					// 从future列表中删除已经完成的任务
					futureList.remove(future);
					taskSize--;
					//此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
					break; // 进行下一次while循环
				}
			}
		}

		// 所有任务已经完成,关闭线程池
		System.out.println("all over.");
		executor.shutdown();
	}

}

可见轮询future列表非常的复杂,而且还有很多异常需要处理,TimeOutException异常需要忽略;还要通过双重循环和break,防止遍历集合的过程中,出现并发修改异常。这么多需要考虑的细节,程序员很容易犯错。

2.使用CompletionService

package net.aty.completeservice;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionServiceTest
{
	public static void main(String[] args)
	{
		int taskSize = 5;

		ExecutorService executor = Executors.newFixedThreadPool(taskSize);

		// 构建完成服务
		CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
				executor);

		for (int i = 1; i <= taskSize; i++)
		{
			int sleep = taskSize - i; // 睡眠时间

			int value = i; // 返回结果

			// 向线程池提交任务
			completionService
					.submit(new ReturnAfterSleepCallable(sleep, value));
		}

		// 按照完成顺序,打印结果
		for (int i = 0; i < taskSize; i++)
		{
			try
			{
				System.out.println(completionService.take().get());
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			} catch (ExecutionException e)
			{
				e.printStackTrace();
			}
		}

		// 所有任务已经完成,关闭线程池
		System.out.println("all over.");
		executor.shutdown();
	}
}

可见使用CompletionService不会有TimeOutExeception的问题,不用遍历future列表,不用担心并发修改异常。

3.CompletionService和ExecutorCompletionService的实现

JDK源码中CompletionService的javadoc说明如下:

/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt>
 * completed tasks and process their results in the order they
 * complete.
 */

也就是说,CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。

ExecutorCompletionService是CompletionService的实现,融合了线程池Executor和阻塞队列BlockingQueue的功能。

 public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

到这里可以推测,按照任务的完成顺序获取结果,就是通过阻塞队列实现的,阻塞队列刚好具有这样的性质:阻塞和有序。

ExecutorCompletionService任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
}

QueueingFuture是FutureTask的一个子类,通过改写FutureTask类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

 /**
  * FutureTask extension to enqueue upon completion
  */
private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
}

这里简单说明下:FutureTask.done(),这个方法默认什么都不做,就是一个回调,当提交的线程池中的任务完成时,会被自动调用。这也就说时候,当任务完成的时候,会自动执行QueueingFuture.done()方法,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。

至此CompletionService和ExecutorCompletionService的讲解结束。特别感谢http://xw-z1985.iteye.com/blog/1997077这篇博客的作者海浪儿,解决了我在看《java并发编程实践》中的困惑,本文也基本上是照抄那篇博文的。

JDK并发工具包CompletionService和ExecutorCompletionService的好处和使用场景,布布扣,bubuko.com

时间: 2024-10-06 17:39:06

JDK并发工具包CompletionService和ExecutorCompletionService的好处和使用场景的相关文章

jdk并发工具包之锁

1.cynchronized扩展:可重如锁ReentrantLock ReentrantLock是通过cas算法实现的 RenntrantLock lock=new ReentrantLock(); lock.lock();//如果资源被占用则会等待 //代码锁定区域 finally{ //必须手动解锁 lock.unlock(); } (1)中断锁 ReentrantLock 提供了可中断锁lockInterruptibly lock.lockInterruptibly();//会获得锁,但是

使用JUC并发工具包的Lock和Condition,实现生产者和消费者问题中的有界缓存

JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制.JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活.此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便.快速.高效的构建自己应用需要的效果.这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的

Java 并发工具包 java.util.concurrent 大全

1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包.这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类.在这个包被添加以前,你需要自己去动手实现自己的相关工具类. 本文我将带你一一认识 java.util.concurrent 包里的这些类,然后你可以尝试着如何在项目中使用它们.本文中我将使用 Java 6 版本,我不确定这和 Java 5 版本里的是否有一些差异

并发编程—— CompletionService : Executor 和 BlockingQueue

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 并发编程—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程—— Callable和Future 并发编程—— CompletionService : Executor 和 BlockingQueue 概述 第1部分 问题引入 第2部分 第1部分 问题引入 <Java并发编程实践>一书6.3.5节CompletionService:Execu

Java 8并发工具包简介

转自:http://www.toutiao.com/a6399027925228077313/ Java 8并发工具包由3个包组成,分别是java.util.concurrent.java.util.concurrent.atomic和java.util.concurrent.locks,提供了大量关于并发的接口.类.原子操作类.锁相关类.借助java.util.concurrent包,可以非常轻松地实现复杂的并发操作.java.util.concurrent包主要包含以下内容,后文将具体介绍:

Java_并发工具包 java.util.concurrent 用户指南(转)

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html.本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友可以去 Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf[带书签] 进行下载. 1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台

CompletionService和ExecutorCompletionService

CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象. 如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果.为此你可以将每个任务的Future保存进一个集合,然后循环这个集合调用Future的get()取出数据.幸运的是CompletionService帮你做了这件事情. CompletionService整合了Executor和BlockingQueue的功能.你可以将Callabl

JDK并发工具类

在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段. 1等待多线程完成的CountDownLatch CountDownLatch:CountDownLatch允许一个或多个线程等待其他线程完成操作.与thread.join方法类似但功能更多.该计数器只能使用一次 CountDownLatch的构造函数接收一个int类型的参数作

JDK并发相关并发包

synchronized的功能扩展: 重入锁:ReentrantLock: 其实也就是lock对于synchronized的替代了,要注意的一个点就是你可以锁多个,但是你锁了几个,最后就要解锁几个这个问题: 使用lock.lock()加锁,使用lock.unlock()解锁: 提供中断处理: 使用中断锁,线程可以被中断,也就是说,当一个线程正在等待锁的时候,他依然可以收到一个通知,被告知无须等待,可以停止工作了,使用的是lock.lockInterruptibly();方法: 锁申请等待限时: