使用JDK1.8 CompletableFuture异步化任务处理

0.概述

服务端编程的一个经典场景是在接收和处理客户端请求时,为了避免对每一个请求都分配线程而带来的资源开销,服务一般会预先分配一个固定大小的线程池(比如Tomcat connector maxThreads),当客户端请求到来时,从线程池里寻找空闲状态的线程来处理请求,请求处理完毕后会回到线程池,继续服务下一个请求。当线程池内的线程都处于繁忙状态时,新来的请求需要排队直到线程池内有可用的线程,或者当超出队列容量后(Tomcat connector acceptCount属性)请求被拒绝(connection refused error)。

为了提高服务的吞吐量,我们应当确保主线程尽快处理尽快返回,尽量使服务端的任务处理线程池始终有可分配的线程来处理新的客户端请求。

当主线程执行一个任务时,如果该任务较耗时, 通常的做法是利用Future/Promise来异步化处理任务。从JDK1.5开始,J.U.C中提供了Future来代表一个异步操作。JDK1.8中则新增了lambda表达式和CompletableFuture, 可以帮助我们更好的用函数式编程风格来实现任务的异步处理。

1. Future

代码例子:

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(() -> {
     // long running task
     return "task finish.";
});

Future实在是太鸡肋了,仅暴露了get/cancel/isDone/isCancelled方法。我们无法通过Future去手动结束一个任务,也无法非阻塞的去获取Future的任务结果,因为future.get()方法是阻塞的。假设有下面这个场景,当前有两个任务,后一个任务依赖了前一个任务的处理结果,这种场景也无法通过Future来实现异步流程任务处理。

2. CompletableFuture

CompletableFuture实现了Future和CompletionStage两个接口,CompletionStage可以看做是一个异步任务执行过程的抽象。我们可以基于CompletableFuture方便的创建任务和链式处理多个任务。下面我们通过实例来介绍它的用法。

2.1 创建任务

可以使用runAsync方法新建一个线程来运行Runnable对象(无返回值)

CompletableFuture<Void> futureAsync = CompletableFuture.runAsync(() -> {
    // long running task without return value
    System.out.println("task finish.");
});

也可以使用supplyAysnc方法新建线程来运行Supplier<T>对象(有返回值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
});

这里执行任务的线程来自于ForkJoinPool.commonPool() , 也可以自定义线程池

ExecutorService exector = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
}, executor);
 

2.2  任务的异步处理

不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。

2.2.1  使用callback基于特定任务的结果进行异步处理

程序中经常需要主线程创建新的线程来处理某一任务,然后基于任务的完成结果(返回值或者exception)来执行特定逻辑, 对于这种场景我们可以很方面的使用whenComplete或者whenCompleteAsync来注册callback方法

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
});

future.whenComplete((result, exception) -> {
    if (null == exception) {
        System.out.println("result from previous task: " + result);
    }
});
对于任务执行中抛错的情况:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    throw new RuntimeException("error!");
});

future.whenComplete((result, exception) -> {
    if (null == exception) {
        System.out.println("result from previous task: " + result);
    } else {
        System.err.println("Exception thrown from previous task: " + exception.getMessage());
    }
});

也可以用exceptionally来显示的处理错误:

CompletableFuture.supplyAsync(() -> {
    throw new IllegalArgumentException("error");
}).exceptionally(ex -> {
    System.out.println("Exception caught: " + ex.getMessage());
    return ex.getMessage();
}).thenAccept(result -> {
    System.out.println("result: " + result);
});

如果不需关心任务执行中是否有exception,则可以使用thenAccept方法, 需要注意的是如果执行中抛了exception, 则thenAccept里面的回调方法不会被执行

CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
}).thenAccept((result) -> {
    System.out.println("result from previous task: " + result);
});
 

2.2.2 任务的链式处理

在应用中经常会遇到任务的pipeline处理,任务A执行完后触发任务B,任务B执行完后触发任务C,上一个任务的结果是下一个任务的输入,对于这种场景,我们可以使用thenApply方法。

CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task1";
}).thenApply(previousResult -> {
    return previousResult + " task2";
}).thenApply(previousResult -> {
    return previousResult + " task3";
}).thenAccept(previousResult -> {
    System.out.println(previousResult);
});
output: task1 task2 task3

让我们再看下面这个例子,某一应用需要先根据accountId从数据库找到对应的账号信息,然后对该账号执行特定的处理逻辑:

CompletableFuture<Account> getAccount(String accountId) {
    return CompletableFuture.supplyAsync(() -> {
        return accountService.findAccount(accountId);
    });
}

CompletableFuture<String> processAccount(Account account) {
    return CompletableFuture.supplyAsync(() -> {
        return accountService.updateAccount(account);
    });
}

如果使用thenApply方法,其返回的结果是一个嵌套的CompletableFuture对象:

CompletableFuture<CompletableFuture<String>> res = getAccount("123").thenApply(account -> {
    return processAccount(account);
});

如果不希望结果是嵌套的CompletableFuture,我们可以使用thenCompose方法来替代thenApply

CompletableFuture<String> res = getAccount("123").thenCompose(account -> {
    return processAccount(account);
});

2.2.3 多任务的并行处理

另一种常见的场景是将一个大的任务切分为数个子任务,并行处理所有子任务,当所有子任务都成功结束时再继续处理后面的逻辑。以前的做法是利用CountDownLatch, 主线程构造countDownLatch对象,latch的大小为子任务的总数,每一个任务持有countDownLatch的引用,任务完成时对latch减1,主线程阻塞在countDownLatch.await方法上,当所有子任务都成功执行完后,latch=0, 主线程继续执行。

int size = 5;
CountDownLatch latch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
    Executors.newFixedThreadPool(size).submit(() -> {
        try {
            // long running task
            System.out.println(Thread.currentThread().getName() + " " + latch.getCount());
        } finally {
            latch.countDown();
        }
    });
}
try {
    latch.await();
} catch (InterruptedException e) {
    e.printStackTrace();
}

// continue...
System.out.println(Thread.currentThread().getName());

这样的代码繁琐且很容易出错,我们可以用CompletableFuture.allOf来方便的处理上述场景。直接贴例子, 根据一组账户ID并行查找对应账户:

CompletableFuture<String> findAccount(String accountId) {
    return CompletableFuture.supplyAsync(() -> {
        // mock finding account from database
        return "account" + accountId;
    });
}

public void batchProcess(List<String> accountIdList) {
    // 并行根据accountId查找对应account
    List<CompletableFuture<String>> accountFindingFutureList =
        accountIdList.stream().map(accountId -> findAccount(accountId)).collect(Collectors.toList());

    // 使用allOf方法来表示所有的并行任务
    CompletableFuture<Void> allFutures =
        CompletableFuture
            .allOf(accountFindingFutureList.toArray(new CompletableFuture[accountFindingFutureList.size()]));

    // 下面的方法可以帮助我们获得所有子任务的处理结果
    CompletableFuture<List<String>> finalResults = allFutures.thenApply(v -> {
        return accountFindingFutureList.stream().map(accountFindingFuture -> accountFindingFuture.join())
            .collect(Collectors.toList());
    });
}

如果后续逻辑没有必要等待所有子任务全部结束,而是只要任一一个任务成功结束就可以继续执行,我们可以使用CompletableFuture.anyOf方法:

CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(taskFutureA, taskFutureB, taskFutureC);

假设三个任务中taskFutureA最先执行完毕并成功返回,则anyOfFutures里得到的是taskFutureA的执行结果.

3.展望

基于JDK1.8的lambda表达式和CompletableFuture, 我们可以写出更具有函数式编程风格的代码,可以更方便的实现任务的异步处理,只用很少的代码便可以实现任务的异步pipeline和并行调用。在异步开发模型(nodeJs/Vert.x)越来越火的今天,我们就从今天开始使用lambda+CompletableFuture来改造我们的Java应用吧。

原文地址:https://www.cnblogs.com/aalex/p/8480112.html

时间: 2024-08-29 15:44:35

使用JDK1.8 CompletableFuture异步化任务处理的相关文章

Apache Ignite 改装(一) -- 服务异步化支持

本文假设读者了解Apache Ignite,阅读过ignite service grid的官方文档,或使用过ignite的service grid,本文同样假设读者了解 java的CompletionStage的相关用法.本文涉及的ignite版本为2.4.0. 使用Apache Ignite的Service grid作为微服务开发框架, 通常是如下定义和实现Service的: 服务接口: public interface MyService { public String sayHello(S

Dubbo 2.7新特性之异步化改造

这是why技术的第1篇原创文章 我与Dubbo的二三事 我是2016年毕业的,在我毕业之前,我在学校里面学到的框架都是SSH,即struts+spring+hibernate,是的你没有看错,在大学里面的课本里面学的是strusts,这个还没毕业就被基本抛弃的框架.然而我大四出去实习,用的技术是SSM,即Spring,SpringMVC,Mybatis.实习的时候做的项目都是外包项目,非常传统的单体大项目,和学校里面做课程设计一样,所有的功能包括前后端都糅合在一个项目里面,根本不知道什么是分布式

使用消息队列异步化系统

使用消息队列异步化系统 基于Spring与ActiveMQ的配置实现方案 前言 前期为了快速开发,项目结构较为混乱,代码维护与功能扩展都比较困难,为了方便后续功能开发,最近对项目进行的重构,顺便在重构的过程中将之前的部分操作进行了异步处理,也第一次实际接触了JMS与消息队列.项目中采用的消息中间件为ActiveMQ. 什么是JMS Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分

[转]向facebook学习,通过协程实现mysql查询的异步化

FROM : 通过协程实现mysql查询的异步化 前言 最近学习了赵海平的演讲,了解到facebook的mysql查询可以进行异步化,从而提高性能.由于facebook实现的比较早,他们不得不对php进行hack才得以实现.现在的php5.5,已经无需hack就可以实现了.对于一个web网站的性能来说,瓶颈多半是来自于数据库.一般数据库查询会在某个请求的整体耗时中占很大比例.如果能提高数据库查询的效率,网站的整体响应时间会有很大的下降.如果能实现mysql查询的异步化,就可以实现多条sql语句同

异步化

异步化最近在处理交易下单的流程:用户下单请求 <-> 前端控制层 <-> 业务逻辑层 <-> 同步调用 <-> 订单接口服务 <-> 数据层以上即为目前用户下单的处理流程,业务逻辑层使用同步的方式调用接口服务,直到订单接口服务响应或者超时.查看了很多资料说加入消息队列的机制,即可实现异步.首先将用户的下单请求添加消息队列中,然后订单接口服务从消息队列中获取请求消息进行处理.此时有两种处理方法:1.前端系统在将请求添加到队列后,是一直在等待响应,还

异步化,高并发大杀器

聊聊如何让项目异步化的一些事. 1.同步和异步,阻塞和非阻塞 同步和异步,阻塞和非阻塞, 这个几个词已经是老生常谈,当时常常还是有很多同学分不清楚,以为同步肯定就是阻塞,异步肯定就是非阻塞,其他他们不是一回事. 同步和异步关注的是结果消息的通信机制 同步:同步的意思就是调用方需要主动等待结果的返回异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等.阻塞和非阻塞主要关注的是等待结果返回调用方的状态 阻塞:是指结果返回之前,当前线程被挂起,不做任何事非阻塞:是指

使用 CompletableFuture 异步组装数据

使用 CompletableFuture 异步组装数据 一种快捷.优雅的异步组装数据方式 实际项目中经常遇到这种情况: 从多个表中查找到数据然后拼装成一个VO返回给前端. 这个过程有可能会非常耗时.因为最终每一条返回的VO数据是由多个表中的数据拼装而成,如果项目还是微服务需要从其他服务获取数据,那将会更加耗时,更加麻烦.简单的几十条.几百条数据单个线程跑起来可能没有什么压力,但是当数量达到成千上万,几十万,几百万,组装的逻辑也变得非常复杂时,这个操作就非常耗时. 最近我在项目中就遇到这个的情况.

java(java8 CompletableFuture)异步执行之后获取回调

应用场景是产品中需要有一个批量执行的任务,很多店铺同时执行,并且需要执行之后的结果进行业务处理,然后在全部执行完毕之后通知处理完毕 用Future和Callable虽然可以阻塞获取结果,但是因为处理起来有些繁琐,比较消耗资源,而CompletableFuture可以满足这个需求,让异步编程变的更加轻松. 直接上demo public static void main(String[] args) { //批量异步 ExecutorService executor = Executors.newF

[项目回顾]项目异步化

前言 前期为了快速开发,项目结构较为混乱,代码维护与功能扩展都比较困难,为了方便后续功能开发,最近对项目进行的重构,顺便在重构的过程中将之前的部分操作进行了异步处理,也第一次实际接触了JMS与消息队列.项目中采用的消息中间件为ActiveMQ. 什么是JMS Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的A