Future和CompletableFuture

Future

从JDK1.5开始,提供了Future来表示异步计算的结果,一般它需要结合ExecutorService(执行者)和Callable(任务)来使用。

示例

import java.util.*;
import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        Future<Integer> future = executor.submit(() -> {
            // 故意耗时
            Thread.sleep(5000);
            return new Random().nextInt(100);
        });

        System.out.println(future.get());
        System.out.println("如果get是阻塞的,则此消息在数据之后输出");
        executor.shutdown();
    }

}

输出

即使异步任务等待了5秒,也依然先于消息输出,由此证明get方法是阻塞的。

Future只是个接口,实际上返回的类是FutureTask:

/**
 * 表示此任务的运行状态。最初是NEW == 0。运行状态仅在set、setException和cancel方法中转换为终端状态。
 *
 * 可能的状态转换:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果当前状态是COMPLETING及其之下的状态,则需要进入awaitDone方法阻塞等待任务完成。
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

CompletableFuture

JDk1.8引入了CompletableFuture,它实际上也是Future的实现类。这里可以得出:

1. 面试问Future和CompletableFuture的区别实际上是不严谨的,因为一个是接口一个是其实现类。

2. 问区别实际上是问FutureTask和CompletableFuture的区别,或者说CompletableFuture有哪些新特性,能完成Future不能完成的工作。

首先看类定义,可以看到,实现了CompletionStage接口,这个接口是所有的新特性了

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

对于CompletableFuture有四个执行异步任务的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

1. 如果我们指定线程池,则会使用我么指定的线程池;如果没有指定线程池,默认使用ForkJoinPool.commonPool()作为线程池。

2. supply开头的带有返回值,run开头的无返回值。

1. 执行异步任务(supplyAsync / runAsync)

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return new Random().nextInt(100);
        }, executor);

        System.out.println(future.get());
        executor.shutdown();
    }

}

以上仅仅返回个随机数,如果我们要利用计算结果进一步处理呢?

2. 结果转换(thenApply / thenApplyAsync)

// 同步转换
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
// 异步转换,使用默认线程池
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 异步转换,使用指定线程池
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> future = CompletableFuture
                // 执行异步任务
                .supplyAsync(() -> {
                    return new Random().nextInt(100);
                }, executor)
                // 对上一步的结果进行处理
                .thenApply(n -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int res = new Random().nextInt(100);
                    System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
                    return n + res;
                });

        System.out.println("我等了你2秒");
        System.out.println(future.get());

        executor.shutdown();
    }

}

输出:

如果把thenApply换成thenApplyAsync,则会输出:

处理完任务以及结果,该去消费了

3. 消费而不影响最终结果(thenAccept / thenRun / thenAcceptBoth)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

这三种的区别是:

thenAccept:能够拿到并利用执行结果

thenRun:不能够拿到并利用执行结果,只是单纯的执行其它任务

thenAcceptBoth:能传入另一个stage,然后把另一个stage的结果和当前stage的结果作为参数去消费。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> future = CompletableFuture
                // 执行异步任务
                .supplyAsync(() -> {
                    return new Random().nextInt(100);
                }, executor)
                // 对上一步的结果进行处理
                .thenApplyAsync(n -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int res = new Random().nextInt(100);
                    System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
                    return n + res;
                });
        // 单纯的消费执行结果,注意这个方法是不会返回计算结果的——CompletableFuture<Void>
        CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> {
            System.out.println("单纯消费任务执行结果:" + n);
        });
        // 这个无法消费执行结果,没有传入的入口,只是在当前任务执行完毕后执行其它不相干的任务
        future.thenRunAsync(() -> {
            System.out.println("我只能执行其它工作,我得不到任务执行结果");
        }, executor);

        // 这个方法会接受其它CompletableFuture返回值和当前返回值
        future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
            return "I‘m Other Result";
        }), (current, other) -> {
            System.out.println(String.format("Current:%s,Other:%s", current, other));
        });

        System.out.println("我等了你2秒");
        System.out.println(future.get());

        executor.shutdown();
    }

}

结果:

如果我要组合两个任务呢?

4. 组合任务(thenCombine / thenCompose)

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

这两种区别:主要是返回类型不一样。

thenCombine:至少两个方法参数,一个为其它stage,一个为用户自定义的处理函数,函数返回值为结果类型。

thenCompose:至少一个方法参数即处理函数,函数返回值为stage类型。

先看thenCombine

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> otherFuture = CompletableFuture
                // 执行异步任务
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("任务A:" + result);
                    return result;
                }, executor);

        CompletableFuture<Integer> future = CompletableFuture
                // 执行异步任务
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("任务B:" + result);
                    return result;
                }, executor)
                .thenCombineAsync(otherFuture, (current, other) -> {
                    int result = other + current;
                    System.out.println("组合两个任务的结果:" + result);
                    return result;
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

执行结果:

再来看thenCompose

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> future = CompletableFuture
                // 执行异步任务
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("任务A:" + result);
                    return result;
                }, executor)
                .thenComposeAsync((current) -> {
                    return CompletableFuture.supplyAsync(() -> {
                        int b = new Random().nextInt(100);
                        System.out.println("任务B:" + b);
                        int result = b + current;
                        System.out.println("组合两个任务的结果:" + result);
                        return result;
                    }, executor);
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

输出:

注意这两个输出虽然一样,但是用法不一样。

5. 快者优先(applyToEither / acceptEither)

有个场景,如果我们有多条渠道去完成同一种任务,那么我们肯定选择最快的那个。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

这两种区别:仅仅是一个有返回值,一个没有(Void)

先看applyToEither

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> otherFuture = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("执行者A:" + result);
                    try {
                        // 故意A慢了一些
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "执行者A【" + result + "】";
                }, executor);

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("执行者B:" + result);
                    return "执行者B【" + result + "】";
                }, executor)
                .applyToEither(otherFuture, (faster) -> {
                    System.out.println("谁最快:" + faster);
                    return faster;
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

输出:

再看acceptEither

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> otherFuture = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("执行者A:" + result);
                    try {
                        // 故意A慢了一些
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "执行者A【" + result + "】";
                }, executor);

        CompletableFuture<Void> future = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("执行者B:" + result);
                    return "执行者B【" + result + "】";
                }, executor)
                .acceptEither(otherFuture, (faster) -> {
                    System.out.println("谁最快:" + faster);
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

输出:

6. 异常处理(exceptionally / whenComplete / handle)

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

exceptionally

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    if (true){
                        throw new RuntimeException("Error!!!");
                    }
                    return "Hello";
                }, executor)
                // 处理上一步发生的异常
                .exceptionally(e -> {
                    System.out.println("处理异常:" + e.getMessage());
                    return "处理完毕!";
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

输出:

whenComplete

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    if (true){
                        throw new RuntimeException("Error!!!");
                    }
                    return "Hello";
                }, executor)
                // 处理上一步发生的异常
                .whenComplete((result,ex) -> {
                    // 这里等待为了上一步的异常输出完毕
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("上一步结果:" + result);
                    System.out.println("处理异常:" + ex.getMessage());
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

输出结果:

可以看见,用whenComplete对异常情况不是特别友好。

handle

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    if (true){
                        throw new RuntimeException("Error!!!");
                    }
                    return "Hello";
                }, executor)
                // 处理上一步发生的异常
                .handle((result,ex) -> {
                    System.out.println("上一步结果:" + result);
                    System.out.println("处理异常:" + ex.getMessage());
                    return "Value When Exception Occurs";
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

输出:

综上,如果单纯要处理异常,那就用exceptionally;如果还想处理结果(没有异常的情况),那就用handle,比whenComplete友好一些,handle不仅能处理异常还能返回一个异常情况的默认值。

对比

Future:我们的目的都是获取异步任务的结果,但是对于Future来说,只能通过get方法或者死循环判断isDone来获取。异常情况就更是难办。

CompletableFuture:只要我们设置好回调函数即可实现:

1. 只要任务完成,即执行我们设置的函数(不用再去考虑什么时候任务完成)

2. 如果发生异常,同样会执行我们处理异常的函数,甚至连默认返回值都有(异常情况处理更加省力)

3. 如果有复杂任务,比如依赖问题,组合问题等,同样可以写好处理函数来处理(能应付复杂任务的处理)

原文地址:https://www.cnblogs.com/LUA123/p/12050255.html

时间: 2024-08-26 00:46:01

Future和CompletableFuture的相关文章

Java CompletableFuture 详解

Future是Java 5添加的类,用来描述一个异步计算的结果.你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行. public class BasicFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture 异常处理completeExceptionally可以把异常抛到主线程 /** * User: laizhenwei * Date: 2018-01-30 Time: 22:26 * Description: */ @RunWith(SpringRunner.class) //@SpringBootTest public class CompletableFutureTests { @Test public void testMethod() { String

Copycat - MemberShip

https://github.com/atomix/copycat   http://atomix.io/copycat/docs/membership/   为了便于实现,Copycat把member分成3种, active, passive, and reserve members - each of which play some role in supporting rapid replacement of failed servers.   Active members are ful

Copycat - command

client.submit(new PutCommand("foo", "Hello world!")); ServerContext connection.handler(CommandRequest.class, request -> state.command(request)); State.command ReserveState开始,会把command forward到leader,只有leader可以处理command @Override pub

JVM 并发性: 阻塞还是不阻塞?

在任何并发性应用程序中,异步事件处理都至关重要.事件来源可能是不同的计算任务.I/O 操作或与外部系统的交互.无论来源是什么,应用程序代码都必须跟踪事件,协调为响应事件而采取的操作.Java 应用程序可采用两种基本的异步事件处理方法:该应用程序有一个协调线程等待事件,然后采取操作,或者事件可在完成时直接执行某项操作(通常采取执行应用程序所提供的代码的方式).让线程等待事件的方法被称为阻塞 方法.让事件执行操作.线程无需显式等待事件的方法被称为非阻塞 方法. 旧的 java.util.concur

Java线程池介绍

Java线程池介绍 2015-10-24 ImportNew (点击上方公号,可快速关注) 原文:allegro 译文:ImportNew - paddx 链接:http://www.importnew.com/16845.html 根据摩尔定律(Moore’s law),集成电路晶体管的数量差不多每两年就会翻一倍.但是晶体管数量指数级的增长不一定会导致 CPU 性能的指数级增长.处理器制造商花了很多年来提高时钟频率和指令并行.在新一代的处理器上,单线程程序的执行速率确实有所提高.但是,时钟频率

spring @Async异步方法使用及原理说明

异步类: package com.example.spring.async; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import com.example.spring.MyLog; /** * 将一个类声明为异步类,那么这个类对外暴露的方法全部成为异步方法. * 与异步方法的区别是这里的注解是加到类上,异步方法的注解是加到方法上.

一文带你实现RPC框架

想要获取更多文章可以访问我的博客?-?代码无止境. 现在大部分的互联网公司都会采用微服务架构,但具体实现微服务架构的方式有所不同,主流上分为两种,一种是基于Http协议的远程调用,另外一种是基于RPC方式的调用.两种方式都有自己的代表框架,前者是著名的Spring Cloud,后者则是有阿里巴巴开源的Dubbo,二者都被广泛的采用.今天这篇文章,我们就一起来了解一下RPC,并且和大家一起动手实现一个简单的RPC框架的Demo. 什么是RPC RPC是一种远程调用过程,是一种通过网络远程调用其他服

从零开始开发IM(即时通讯)服务端(二)

好消息:IM1.0.0版本已经上线啦,支持特性: 私聊发送文本/文件 已发送/已送达/已读回执 支持使用ldap登录 支持接入外部的登录认证系统 提供客户端jar包,方便客户端开发 github链接: https://github.com/yuanrw/IM 本篇将带大家从零开始搭建一个轻量级的IM服务端,IM的整体设计思路和架构在我的上篇博客中已经讲过了,没看过的同学请点击从零开始开发IM(即时通讯)服务端 . 这篇将给大家带来更多的细节实现.我将从三个方面来阐述如何构建一个完整可靠的IM系统