Java批处理ExecutorService/CompletionService

服务端接收一个请求,常常需要同时进行几个计算或者向其他服务发送请求,最后拼装结果返回上游。本文就来看下JDK提供几个并行处理方案,牵涉到ExcecutorService/CompletionService。要实现的场景是请求有超时限制,如果所有操作都计算完成,则全部拼装返回;否则只拼装部分完成的结果。

1.前提

//任务类,sleep一个时间代表这个计算需要的耗时,返回一个计算结果。
public class MyTask implements Callable<Integer> {
    private int id;
    private int time;
    public MyTask(int i, int time) {
        this.id = i;
        this.time = time;
    }
    @Override
    public Integer call() throws Exception {
        Thread.sleep(time);
        return id;
    }
}
//线程池
ExecutorService threadPool = new ThreadPoolExecutor(10, 20,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));

2.任务逐个等待

提交任务后,逐个等待结果返回。

//总共150ms超时
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
futures.add(threadPool.submit(new MyTask(1, 60)));
futures.add(threadPool.submit(new MyTask(2, 150)));
Integer res = 0;

for (int i = 0; i < futures.size(); ++i) {
try {
Integer tmp = futures.get(i).get(50 + i * 50, TimeUnit.MILLESECONDS);
res += tmp;
} catch (Exception e) {
//nothing
}
}
System.out.println(res);

打印结果为0,实际上第60ms时第一个计算已经完成,本可以返回第一个结果的。这是因为生产者和消费者一一对应,调度不当。

3.等待解耦

用一个队列来实现生产者和消费者解耦,生产者把结果放到一个队列中,消费者从队列中取结果,不按照任务提交顺序等待,只要有结果就会消耗。CompletionService就是对Executor和异步队列的封装。

CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
futures.add(service.submit(new MyTask(1, 60)));
futures.add(service.submit(new MyTask(2, 150)));

List<Integer> result = new ArrayList<Integer>(futures.size());
for (int i = 0; i < futures.size(); ++i) {
    Future<Integer> future = service.poll(50 + i * 50, TimeUnit.MILLISECONDS);
    if (null != future){
        result.add(future.get());
    }
}
int res = 0;
for (Integer i : result) {
    res += null == i ? 0 : i;
}
System.out.println(res);

打印结果为1,即第一个计算的结果。谁先完成就取谁。

4.充分利用时间

前面两个方案为每个任务都设置了一个固定的等待时间,两者之和不超过超时限制。这没有充分利用时间,两个任务是并发互不影响的,其各自可利用的时间应该是超时时间,而不应该是两者之和为超时时间。如果第一个任务耗时140,第二个耗时60,前两个任务就只能获得第二个任务的结果。如果两者都设置超时为150,就能获得2个结果。

List<MyTask> tasks = new ArrayList<MyTask>();
tasks.add(new MyTask(1, 140));
tasks.add(new MyTask(2, 60));
List<Future<Integer>> futures = threadPool.invokeAll(tasks, 150, TimeUnit.MILLISECONDS);

int res = 0;
for (Future<Integer> future : futures) {
    System.out.println("isDone:" + future.isDone());
    System.out.println("isCancel:" + future.isCancelled());
    if (future.isCancelled()) {
        continue;
    }
    res += future.get();
}
System.out.println(res);

打印结果是3。上述方法利用ExecutorService的invokeAll方法,该方法在所有任务都完成或等待超时时返回,超时的时候会取消还没有完成的任务,通过判断任务是否被取消来判断任务是否计算完成。

该文章说的是结果互不影响、有结果就能返回的应用场景。如果结果需要按照先后顺序进行合并,或者只需要等待一个计算完成,就不一定需要方案3了。

时间: 2024-10-30 14:56:43

Java批处理ExecutorService/CompletionService的相关文章

Java线程之CompletionService批处理任务

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果,怎么办呢? 为此你可以保存与每个任务相关联的Future,然后不断地调用 timeout为零的get,来检验Future是否完成.这样做固然可以,但却相当乏味.幸运的是,还有一个更好的方法:完成服务 (Completion service). CompletionService整合了Executor和BlockingQueue的功能. 你可以将Callable任务提交给它去执行,然 后使用类似于队列中的take和poll

JAVA批处理操作

批处理,可以大幅度提升大量增.删.改的速度,就是对大数据操作有很大的效率提升. 与上篇文章中提到的"连接池"相似.其实就是先将多次操作(增删改)打包,然后再一次发送执行 主要用到两个方法: ?  打包:PreparedStatement.addBatch(); ?  发送.执行:PreparedStatement.executeBatch(); 下面看做同一件事,用批处理和不用批处理的效率对比,源码如下: import java.sql.Connection; import java.

java android ExecutorService 线程池解析

ExecutorService: 它也是一个接口,它扩展自Executor接口,Executor接口更像一个抽象的命令模式,仅有一个方法:execute(runnable);Executor接口简单,可是非常重要,重要在这样的设计的模式上..Java5以后,通过Executor来启动线程比用Thread的start()更好.在新特征中,能够非常easy控制线程的启动.运行和关闭过程,还能够非常easy使用线程池的特性. 几种不同的ExecutorService线程池对象 1.newCachedT

java批处理、MySQL批处理

1 e: 2 cd MySQL\bin 3 mysql -uroot -proot 4 @pause MySQL批处理.bat 1 e: 2 cd JAVA\jdk1.8.0_77\bin 3 javac Hello.java 4 java Hello 5 @pause JAVA批处理.bat

Java中ExecutorService和CompletionService区别

我们现在在Java中使用多线程通常不会直接用Thread对象了,而是会用到java.util.concurrent包下的ExecutorService类来初始化一个线程池供我们使用. 之前我一直习惯自己维护一个list保存submit的callable task所返回的Future对象. 在主线程中遍历这个list并调用Future的get()方法取到Task的返回值. public class CompletionServiceTest { static class Task implemen

Java线程之CompletionService

CompletionService的使用 当我们使用ExecutorService创建一个线程池时, 如果执行了多个Callable任务后, 每个Callable任务都会产生一个Future, 如果我们需要处理这些任务产生的结果, 那么就需要将这些Future放入一个线性表中, 用于之后的数据处理. 有了CompletionService, 我们就不用人为地去创建线性表来存这些Future了, CompleService是一个更高级的EexcutorService, 它自身自带一个线程安全的线性

java中ExecutorService使用多线程处理业务

ExecutorService executorService = Executors.newFixedThreadPool(5); List<CancelApprovalCallable> callables = new List<>(); for(int i=0,len=idsArray.size();i<len;i++){ String id = idsArray.get(i); CancelApprovalCallable callable = new CancelA

java Executor, ExecutorService, Executors 有什么不同

Executor 是一个接口,只定义了一个方法, 可以接收Runnable实例,用来执行一个实现Runnable接口的任务. void execute(Runnable command); ExecutorService 也是一个接口,继承自Executor,并增加了一些方法,用的比较广泛,提供了一些生命周期的方法.shutdown,还有submit方法返回值是future. ExecutorService就是为了解决执行服务的生命周期问题的. ExecutorService的生命周期有3种状态

java多线程(ExecutorService)

ExecutorService exec = null; List<IbeFlightInfo> ibeFlightInfo = new ArrayList<IbeFlightInfo>(); TransferVO[] b2gFlights = new TransferVO[]{}; try { exec = Executors.newFixedThreadPool(2); Callable IBEcall = new IBEGetFlightThread(request); Ca