java8 之CompletableFuture -- 如何构建异步应用

什么是Future 接口

很多场景下,我们想去获取线程运行的结果,而通常使用execute方法去提交任务是无法获得结果的,这时候我们常常会改用submit方法去提交,以便获得线程运行的结果。

而submit方法返回的就是Future,一个未来对象。 使用future.get() 方法去获取线程执行结果,包括如果出现异常,也会随get方法抛出。

Future 接口的缺陷

当我们使用future.get()方法去取得线程执行结果时,要知道get方法是阻塞的,也就是说为了拿到结果,当主线程执行到get()方法,当前线程会去等待异步任务执行完成,

换言之,异步的效果在我们使用get()拿结果时,会变得无效。示例如下

public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future future = executorService.submit(()->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行了");
        });
        future.get();
        System.out.println("主线任务执行了");
    }

打印结果是:异步任务执行了过后主线任务才执行。  就是因为get()在一直等待。

那么如何解决我想要拿到结果,可以对结果进行处理,又不想被阻塞呢?

CompletableFuture 使一切变得可能

JDK1.8才新加入的一个实现类CompletableFuture,实现了Future<T>CompletionStage<T>两个接口。

实际开发中,我们常常面对如下的几种场景:

1.  针对Future的完成事件,不想简单的阻塞等待,在这段时间内,我们希望可以正常继续往下执行,所以在它完成时,我们可以收到回调即可。

2. 面对Future集合来讲,这其中的每个Future结果其实很难去描述它们之间的依赖关系,而往往我们希望等待所有的Future集合都完成,然后做一些事情。

3. 在异步计算中,两个计算任务相互独立,但是任务二又依赖于任务一的结果。

如上的几种场景,单靠Future是解决不了的,而CompletableFuture则可以帮我们实现。

CompletableFuture 常见api 介绍

1、 runAsync 和 supplyAsync方法

它提供了四个方法来创建一个异步任务

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync类似于execute方法,不支持返回值,而supplyAsync方法类似submit方法,支持返回值。也是我们的重点方法。

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

  示例

   //无返回值
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        System.out.println("runAsync无返回值");
    });

    future1.get();

    //有返回值
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync有返回值");
        return "111";
    });

    String s = future2.get();

2、 异步任务执行完时的回调方法  whenComplete 和 exceptionally

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的任务

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

这些方法都是上述创建的异步任务完成后 (也可能是抛出异常后结束) 所执行的方法。

whenComplete和whenCompleteAsync方法的区别在于:前者是由上面的线程继续执行,而后者是将whenCompleteAsync的任务继续交给线程池去做决定。

exceptionally则是上面的任务执行抛出异常后,所要执行的方法。

示例

CompletableFuture.supplyAsync(()->{
        int a = 10/0;
        return 1;
    }).whenComplete((r, e)->{
        System.out.println(r);
    }).exceptionally(e->{
        System.out.println(e);
        return 2;
    });

值得注意的是:哪怕supplyAsync抛出了异常,whenComplete也会执行,意思就是,只要supplyAsync执行结束,它就会执行,不管是不是正常执行完。exceptionally只有在异常的时候才会执行。

其实,在whenComplete的参数内 e就代表异常了,判断它是否为null,就可以判断是否有异常,只不过这样的做法,我们不提倡。

whenComplete和exceptionally这两个,谁在前,谁先执行。

此类的回调方法,哪怕主线程已经执行结束,已经跳出外围的方法体,然后回调方法依然可以继续等待异步任务执行完成再触发,丝毫不受外部影响。

3、 thenApply 和 handle 方法

如果两个任务之间有依赖关系,比如B任务依赖于A任务的执行结果,那么就可以使用这两个方法

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

这两个方法,效果是一样的,区别在于,当A任务执行出现异常时,thenApply方法不会执行,而handle 方法一样会去执行,因为在handle方法里,我们可以处理异常,而前者不行。

示例

    CompletableFuture.supplyAsync(()->{
        return 5;
    }).thenApply((r)->{
        r = r + 1;
        return r;
    });

    //出现了异常,handle方法可以拿到异常 e
    CompletableFuture.supplyAsync(()->{
        int i = 10/0;
        return 5;
    }).handle((r, e)->{
        System.out.println(e);
        r = r + 1;
        return r;
    });

这里延伸两个方法  thenAccept 和 thenRun。其实 和上面两个方法差不多,都是等待前面一个任务执行完 再执行。区别就在于thenAccept接收前面任务的结果,且无需return。而thenRun只要前面的任务执行完成,它就执行,不关心前面的执行结果如何

如果前面的任务抛了异常,非正常结束,这两个方法是不会执行的,所以处理不了异常情况。

4、 合并操作方法  thenCombine 和 thenAcceptBoth

我们常常需要合并两个任务的结果,在对其进行统一处理,简言之,这里的回调任务需要等待两个任务都完成后再会触发。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

这两者的区别 在于 前者是有返回值的,后者没有(就是个消耗工作)

示例

private static void thenCombine() throws Exception {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            return "future2";
        });

        CompletableFuture<String> result = future1.thenCombine(future2, (r1, r2)->{
            return r1 + r2;
        });
        //这里的get是阻塞的,需要等上面两个任务都完成
        System.out.println(result.get());
    }
private static void thenAcceptBoth() throws Exception {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            return "future2";
        });
        //值得注意的是,这里是不阻塞的
        future1.thenAcceptBoth(future2, (r1, r2)->{
            System.out.println(r1 + r2);
        });

        System.out.println("继续往下执行");
    }

这两个方法 都不会形成阻塞。就是个回调方法。只有get()才会阻塞。

4、 allOf (重点,个人觉得用的场景很多)

  很多时候,不止存在两个异步任务,可能有几十上百个。我们需要等这些任务都完成后,再来执行相应的操作。那怎么集中监听所有任务执行结束与否呢? allOf方法可以帮我们完成。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

它接收一个可变入参,既可以接收CompletableFuture单个对象,可以接收其数组对象。

结合例子说明其作用。

public static void main(String[] args) throws Exception{
        long start = System.currentTimeMillis();
        CompletableFutureTest test = new CompletableFutureTest();
        // 结果集
        List<String> list = new ArrayList<>();

        List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式处理转换成CompletableFuture[]
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> test.calc(integer))
                        .thenApply(h->Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println("任务"+s+"完成!result="+s+",异常 e="+e+","+new Date());
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);

        CompletableFuture.allOf(cfs).join();

        System.out.println("list="+list+",耗时="+(System.currentTimeMillis()-start));
    }

    public int calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }

全流式写法,综合了以上的一些方法,使用allOf集中阻塞,等待所有任务执行完成,取得结果集list。   这里有些CountDownLatch的感觉。

CompletableFuture 总结

图片出自

https://www.cnblogs.com/dennyzhangdd/p/7010972.html

本文只是简述了CompletableFuture的常用用法。日常开发基本够用,但是针对一些特殊场景,例如异常场景,取消场景,仍待研究。

原文地址:https://www.cnblogs.com/xinde123/p/10928091.html

时间: 2024-10-31 00:20:12

java8 之CompletableFuture -- 如何构建异步应用的相关文章

使用kendynet构建异步redis访问服务

最近开始在kendynet上开发手游服务端,游戏类型是生存挑战类的,要存储的数据结构和类型都比较简单,于是选择了用redis做存储,数据类型使用string基本就足够了.于是在kendynet上写了一个简单的redis异步访问接口. 设计理念 1.项目时间紧迫,不打算提供一个大而全的访问接口,只提供一个request接口用以发出redis请求. 2.数据在redis中key和value都存储为string,由使用者负责将数据序列化成string,从string反序列化回数据. 3.服务支持本地访

kendynet构建异步redis访问服务

最近开始在kendynet上开发手游服务端,游戏类型是生存挑战类的,要存储的数据结构和类型都比较简单,于是选择了用redis做存储,数据类型使用string基本就足够了.于是在kendynet上写了一个简单的redis异步访问接口. 设计理念 1.项目时间紧迫,不打算提供一个大而全的访问接口,只提供一个request接口用以发出redis请求. 2.数据在redis中key和value都存储为string,由使用者负责将数据序列化成string,从string反序列化回数据. 3.服务支持本地访

CompletableFuture基本用法

首先是https://www.jianshu.com/p/4897ccdcb278这个帖子,代码差不多贴了跑了,主要是completableFuture的一些方法的用法 然后是来自https://www.cnblogs.com/cjsblog/p/9267163.html的帖子 https://www.cnblogs.com/cjsblog/p/9267163.html 异步计算 所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法.在 Java 语言中,简单的讲就是另

[转载] 构建微服务:使用API Gateway

原文: http://mp.weixin.qq.com/s?__biz=MzA5OTAyNzQ2OA==&mid=206889381&idx=1&sn=478ccb35294c58d25d2df2d9ced65cf7&scene=1&key=c76941211a49ab586d79043cb87ac0dfeede574a20b2208ce76058b151624e4273182de582a786668ea347c6f317b389&ascene=0&

Java编程的逻辑 (94) - 组合式异步编程

前面两节讨论了Java 8中的函数式数据处理,那是对38节到55节介绍的容器类的增强,它可以将对集合数据的多个操作以流水线的方式组合在一起.本节继续讨论Java 8的新功能,主要是一个新的类CompletableFuture,它是对65节到82节介绍的并发编程的增强,它可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发. 之前介绍了那么多并发编程的内容,还有什么问题不能解决?CompletableFuture到底能解决什么问题?与之前介绍的内容有什么关系?

构建高可扩Web架构和分布式系统(2)

构建快速可伸缩的数据访问块 在讨论完设计分布式系统的核心考虑因素后,下面让我们再一起讨论难点部分:可扩展的数据访问. 大多数简单的Web应用程序,例如LAMP堆栈应用程序,看起来如图5所示: 图5:简单的Web应用程序 随着系统渐渐扩大,他们将面临两大主要挑战:构建可扩展的应用程序服务器和数据访问机制.在一个高可扩的应用程序设计中,通常最小化的应用程序(或Web)服务往往能体现一种无共享的架构.这就使得应用程序服务器层要进行横向扩展,这种设计的结果就是把繁重的工作转移到堆栈下层的数据库服务和配置

C#秘密武器之异步编程

一.概述 1.什么是异步? 异步操作通常用于执行完成时间可能较长的任务,如打开大文件.连接远程计算机或查询数据库.异步操作在主应用程序线程以外的线程中执行.应用程序调用方法异步执行某个操作时,应用程序可在异步方法执行其任务时继续执行其他的任务. 2.同步与异步的区别 同步(Synchronous):在执行某个操作时,应用程序必须等待该操作执行完成后才能继续执行. 异步(Asynchronous):在执行某个操作时,应用程序可在异步操作执行时继续执行.实质:异步操作,启动了新的线程,主线程与异步线

异步请求

1.什么是? 原理性. $.ajax({               ----jQuery中封装好的异步请求 type:"post", url:"add.do", cache:"false", dataType:"json", success:function(msg){ //请求成功后执行的语句. } }); 没有封装的情况下的异步请求. ajax=异步(asynchronours) 的  javaScript and  x

[Effective JavaScript 笔记]第68条:使用promise模式清洁异步逻辑

构建异步API的一种流行的替代方式是使用promise(有时也被称为deferred或future)模式.已经在本章讨论过的异步API使用回调函数作为参数. downloadAsync('file.txt',function(file){ console.log('file:'+file); }); 基于promise的API不接收回调函数作为参数.相反,它返回一个promise对象,该对象通过其自身的then方法接收回调函数. var p=downloadP('file.txt'); p.th