CompletionService接口定义为Interface CompletionService<V>接口定它在java7中只有一个实现ExecutorCompletionService,这个接口内部集成了一个BlockingQueue,因此可以实现对多线程运行结果的收集工作。为了更好的测试该接口,我使用了两个测试,第一个测试是自己定义一个外部BlockingQueue来接收callable返回的数据。第二个测试是用CompletionService对executor进行装饰,使得返回的CompletionService对象能直接submit任务。
但是我发现它submit的后并没有马上调用executor的submit,而是对它进行了封装,因此出现了一点点延迟。如果在submit之后使用shutdown()命令结束的话,实际上该task可能还没有 放到executor的taskpool中。所以这一点值得注意。
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; public class testCallable { public static void main(String[] args) { try { futureCount(); completionServiceCount(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /** * 使用自定义阻塞队列得到任务执行结果 * * @throws InterruptedException * @throws ExecutionException */ public static void futureCount() throws InterruptedException, ExecutionException { BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(); ExecutorService executorService = Executors.newCachedThreadPool(); int threadNum = 5; for (int i = 0; i < threadNum; i++) { Future<Integer> future = executorService.submit(getTask()); queue.put(future); } int sum = 0; int temp = 0; while(!queue.isEmpty()){ temp = queue.take().get(); sum += temp; System.out.print(temp + "\t"); } System.out.println("BlockingQueue all is : " + sum); executorService.shutdown(); } /** * 使用completionService收集callable结果 * @throws ExecutionException * @throws InterruptedException */ public static void completionServiceCount() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( executorService); int threadNum = 5; for (int i = 0; i < threadNum; i++) { completionService.submit(getTask()); } int sum = 0; int temp = 0; for(int i=0;i<threadNum;i++){ temp = completionService.take().get(); sum += temp; System.out.print(temp + "\t"); } System.out.println("CompletionService all is : " + sum); executorService.shutdown(); } public static Callable<Integer> getTask() { final Random rand = new Random(); Callable<Integer> task = new Callable<Integer>() { @Override public Integer call() throws Exception { int num = 0; for (int i = 0; i < 10; i++) { num = num + rand.nextInt(10); } return num; } }; return task; } }
时间: 2024-10-09 20:49:03