ExecutorService与ExecutorCompletionService都是java.util.concurrent包的并发处理类,总的来说,ExecutorCompletionService是ExecutorService的功能增强版,ExecutorCompletionService以BlockingQueue<Future<V>>来存放已经完成的任务。
也就是说,优先完成的任务会优先存放在BlockingQueue<Future<V>>队列中,所以我们能及时的拿到最优先的处理结果。
让我们先看看ExecutorService的测试代码,共4个任务,我们刻意让第1个任务的执行时间最长,依次递减,代码如下:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestExecutorService { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newFixedThreadPool(4); //第一个任务 Callable<Integer> task1 = new Callable<Integer>() { public Integer call() throws Exception { //耗时最长, 4秒 Thread.sleep(4000); return 1; } }; //第二个任务 Callable<Integer> task2 = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(3000); return 2; } }; //第三个任务 Callable<Integer> task3 = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(2000); return 3; } }; //第四个任务 Callable<Integer> task4 = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(1000); return 4; } }; Future<Integer> result1 = es.submit(task1); Future<Integer> result2 = es.submit(task2); Future<Integer> result3 = es.submit(task3); Future<Integer> result4 = es.submit(task4); System.out.println("第1个任务等待中..."); System.out.println("第1个任务完成:【" + result1.get() + "】"); System.out.println("第2个任务等待中..."); System.out.println("第2个任务完成:【" + result2.get() + "】"); System.out.println("第3个任务等待中..."); System.out.println("第3个任务完成:【" + result3.get() + "】"); System.out.println("第4个任务等待中..."); System.out.println("第4个任务完成:【" + result4.get() + "】"); } }
输出结果:
第1个任务等待中... 第1个任务完成:【1】 第2个任务等待中... 第2个任务完成:【2】 第3个任务等待中... 第3个任务完成:【3】 第4个任务等待中... 第4个任务完成:【4】
PS:Future.get()方法会造成阻塞,直到任务执行完毕为止。
运行代码可见,result1.get()阻塞了4秒后完成任务,输出结果,而紧随的result2.get(),result3.get(),result4.get()没有阻塞,立马输出结果。那是因为在result1.get()执行完毕时,其余3个任务早已执行完毕等待抓取结果了。所以,使用上述方法并不能得知哪个任务是最先返回结果的。
接下来,让我们看看ExecutorCompletionService的代码:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestExecutorComplectionService { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newFixedThreadPool(4); ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<Integer>(es); //第一个任务 Callable<Integer> task1 = new Callable<Integer>() { public Integer call() throws Exception { //耗时最长, 4秒 Thread.sleep(4000); return 1; } }; //第二个任务 Callable<Integer> task2 = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(3000); return 2; } }; //第三个任务 Callable<Integer> task3 = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(2000); return 3; } }; //第四个任务 Callable<Integer> task4 = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(1000); return 4; } }; ecs.submit(task1); ecs.submit(task2); ecs.submit(task3); ecs.submit(task4); for(int i = 0; i < 4; i++) { Future<Integer> result = ecs.take(); System.out.println("输出结果:【" + result.get() + "】"); } } }
输出结果:
输出结果:【4】 输出结果:【3】 输出结果:【2】 输出结果:【1】
可见,第4个任务task4执行的时间是最短的,第1个输出结果。
下面让我们剖析一下ExecutorComplectionService的源码:
成员变量如下:
executor:ExecutorService类,任务并行执行器
completionQueue:就是保存执行结果的阻塞队列BlockingQueue
submit方法:底层依旧使用ExecutorService来并发执行任务,只不过是多了个功能【把执行完毕的任务放到complectionQueue队列中】
task方法:从complectionQueue队列中获取一个元素,如果没有元素,则阻塞,直到队列中有元素位置,这验证了我们之前的说法。
总结:总的来说,ExecutorComplectionService其实就是 ExecutorService 和 BlockingQueue的结合。