疯狂Java学习(86)-----------使用CompletableFuture处理异步超时

一天,我在改进多线程代码时被Future.get()卡住了。

public void serve() throws InterruptedException, ExecutionException, TimeoutException {
  final Future<Response> responseFuture = asyncCode();
  final Response response = responseFuture.get(1, SECONDS);
  send(response);
}
private void send(Response response) {
  //...
}

这是用Java写的一个Akka应用程序,使用了一个包含1000个线程的线程池(原来如此!)——所有的线程都在阻塞在这个 get() 中。系统的处理速度跟不上并发请求的数量。重构以后,我们干掉了所有的这些线程仅保留了一个,极大的减少了内存的占用。我们简单一点,通过一个Java 8的例子来演示。第一步是使用CompletableFuture来替换简单的Future(见:Tip
9
)。

  • 通过控制任务提交到ExecutorService的方式:只需用 CompletableFuture.supplyAsync(…, executorService) 来代替 executorService.submit(…) 即可
  • 处理基于回调函数的API:使用promises

否则(如果你已经使用了阻塞式的API或 Future<T>)会导致很多线程被阻塞。这就是为什么现在这么多异步的API都让人很烦了。所以,让我们重写之前的代码来接收CompletableFuture:

public void serve() throws InterruptedException, ExecutionException, TimeoutException {
    final CompletableFuture<Response> responseFuture = asyncCode();
    final Response response = responseFuture.get(1, SECONDS);
    send(response);
}

很明显,这不能解决任何问题,我们还必须利用新的风格来编程:

public void serve() {
    final CompletableFuture<Response> responseFuture = asyncCode();
    responseFuture.thenAccept(this::send);
}

这个功能上是等同的,但是 serve() 只会运行一小段时间(不会阻塞或等待)。只需要记住:this::send 将会在完成 responseFuture 的同一个线程内执行。如果你不想花费太大的代价来重载已经存在的线程池或send()方法,可以考虑通过 thenAcceptAsync(this::send, sendPool) 好极了,但是我们失去了两个重要属性:异常传播与超时。异常传播很难实现,因为我们改变了API。当serve()存在的时候,异步操作可能还没有完成。如果你关心异常,可以考虑返回 responseFutureor
或者其他可选的机制。至少,应该有异常的日志,否则该异常就会被吞噬了。

final CompletableFuture<Response> responseFuture = asyncCode();
responseFuture.exceptionally(throwable -> {
    log.error("Unrecoverable error", throwable);
    return null;
});

请小心上面的代码:exceptionally() 试图从失败中恢复过来,返回一个可选的结果。这个地方虽可以正常的工作,但是如果对 exceptionally()和withthenAccept() 使用链式调用,即使失败了也还是会调用 send() 方法,返回一个null参数,或者任何其它从 exceptionally() 方法中返回的值。

responseFuture
    .exceptionally(throwable -> {
        log.error("Unrecoverable error", throwable);
        return null;
    })
    .thenAccept(this::send);  //probably not what you think

丢失一秒超时的问题非常巧妙。我们原始的代码在Future完成之前最多等待(阻塞)1秒,否则就会抛出 TimeoutException。我们丢失了这个功能,更糟糕的是,单元测试超时的不是很方便,经常会跳过这个环节。为了维持超时机制,而又不破坏事件驱动的原则,我们需要建立一个额外的模块:一个在给定时间后必定会失败的 Future。

public static <T> CompletableFuture<T> failAfter(Duration duration) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    scheduler.schedule(() -> {
        final TimeoutException ex = new TimeoutException("Timeout after " + duration);
        return promise.completeExceptionally(ex);
    }, duration.toMillis(), MILLISECONDS);
    return promise;
}

private static final ScheduledExecutorService scheduler =
        Executors.newScheduledThreadPool(
                1,
                new ThreadFactoryBuilder()
                        .setDaemon(true)
                        .setNameFormat("failAfter-%d")
                        .build());

这个很简单:我们创建一个promise(没有后台任务或线程池的 Future),然后在给定的 java.time.Duration 之后会抛出 TimeoutException 异常。如果在某个地方调用 get() 获取这个 Future,阻塞的时间到达这个指定的时间后会抛出 TimeoutException。

实际上,它是一个包装了 TimeoutException 的 ExecutionException,这个无需多说。注意,我使用了固定一个线程的线程池。这不仅仅是为了教学的目的:这是“1个线程应当能满足任何人的需求”的场景。failAfter() 本身没多大的用处,但是如果和 ourresponseFuture 一起使用,我们就能解决这个问题了。

final CompletableFuture<Response> responseFuture = asyncCode();
final CompletableFuture<Response> oneSecondTimeout = failAfter(Duration.ofSeconds(1));
responseFuture
        .acceptEither(oneSecondTimeout, this::send)
        .exceptionally(throwable -> {
            log.error("Problem", throwable);
            return null;
        });

这里还做了很多其他事情。在后台的任务接收 responseFuture 时,我们也创建了一个“合成”的 oneSecondTimeout future,这在成功的时候永远不会执行,但是在1秒后就会导致任务失败。现在我们联合这两个叫做 acceptEither,这个操作将执行先完成 Future 的代码块,而简单的忽略 responseFuture 或 oneSecondTimeout 中运行比较慢的那个。如果 asyncCode() 代码在1秒内执行完成,this::send 就会被调用,而 oneSecondTimeout
异常就不会抛出。但是,如果 asyncCode() 执行真的很慢,oneSecondTimeout 异常就先抛出。由于一个异常导致任务失败,exceptionallyerror 处理器就会被调用,而不是 this::send 方法。你可以选择执行 send() 或者 exceptionally,但是不能两个都执行。当如,如果我们有两个“普通”的 Future 正常执行完成了,则最先响应的那个将调用 send() 方法,后面的就会被丢弃。

这个不是最清晰的解决方案。更清晰的方案是包装原始的 Future,然后保证它能在给定的时间内执行。这种操作对 com.twitter.util.Future 是可行的(Scala叫做 within()),但是 scala.concurrent.Future 中没有这个功能(据推测是为了鼓励使用前面的方式)。我们暂时不讨论Scala背后如何执行的,先实现类似 CompletableFuture 的操作。它接受一个 Future 作为输入,然后返回一个 Future,这个 Future 在后台任务完成时候执行完成。但是,如果底层的
Future 执行的时间太长,就或抛出异常:

public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) {
    final CompletableFuture<T> timeout = failAfter(duration);
    return future.applyToEither(timeout, Function.identity());
}

这引导我们实现最终的、清晰的、灵活的方法:

final CompletableFuture<Response> responseFuture = within(
        asyncCode(), Duration.ofSeconds(1));
responseFuture
        .thenAccept(this::send)
        .exceptionally(throwable -> {
            log.error("Unrecoverable error", throwable);
            return null;
        });

希望你喜欢这篇文章,因为你已经知道在Java里,实现响应式编程不再是什么问题。

时间: 2024-12-11 08:09:06

疯狂Java学习(86)-----------使用CompletableFuture处理异步超时的相关文章

疯狂JAVA学习计划

? Java_StudyPlane ?2015/2/20 ????????大年初二 ????????雨天 1.5小时为1个课时. JAVA学习进度表

疯狂java学习笔记之面向对象(八) - static和final

一.static: 1.static是一个标识符: - 有static修饰的成员表明该成员是属于类的; - 没有static修饰的成员表明该成员是属于实例/对象的. 2.static修饰的成员(Field.方法.初始化块),与类共存亡:static修饰的成员建议总是通过类名来访问,虽然它也可以通过实例来访问(实质也是通过类来访问的),所以平时若在其他程序中见到通过实例/对象来访问static成员时,可以直接将实例/对象 替换成类名: 3.程序都是先有类再有对象的,有可能出现有类但没有实例/对象的

疯狂Java学习笔记(59)-----------50道Java线程面试题

50道Java线程面试题 下面是Java线程相关的热门面试题,你可以用它来好好准备面试. 1) 什么是线程? 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位.程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速.比如,如果一个线程完成一个任务要100毫秒,那么用十个线程完成改任务只需10毫秒.Java在语言层面对多线程提供了卓越的支持,它也是一个很好的卖点.欲了解更多详细信息请点击这里. 2) 线程和进程有什么区别? 线程是进程的子集,一个

疯狂Java学习笔记(34)----------Iterator、Collection接口以及foreach

Iterator.Collection接口: 如下图:Iterator.Collection同在一个包中: 红字部分使我们经常遇到的,但是遇到又不知道怎么去理解,去应用它! Collection是最基本集合接口,它定义了一组允许重复的对象.Collection接口派生了两个子接口Set和List,分别定义了两种不同的存储方式,如下: 2. Set接口 Set接口继承于Collection接口,它没有提供额外的方法,但实现了Set接口的集合类中的元素是无序且不可重复. 特征:无序且不可重复. 3.

疯狂Java学习笔记(51)-----------面试题

自己做了一点面试题,感觉很经典,分享给大家,发现还有很多东西需要学! 一.String,StringBuffer, StringBuilder 的区别是什么?String为什么是不可变的? 答:   1.String是字符串常量,StringBuffer和StringBuilder都是字符串变量.后两者的字符内容可变,而前者创建后内容不可变. 2.String不可变是因为在JDK中String类被声明为一个final类. 3.StringBuffer是线程安全的,而StringBuilder是非

【疯狂Java学习笔记】【理解面向对象】

[学习笔记]1.Java语言是纯粹的面向对象语言,这体现在Java完全支持面向对象的三大基本特征:封装.继承.多态.抽象也是面向对象的重要组成部分,不过它不是面向对象的特征之一,因为所有的编程语言都需要抽象. 2.面向对象开发方法比较结构化开发方法的优势在于可以提供更好的可重用性.可扩展性.可维护性. 3.基于对象和面向对象的区别:基于对象也使用了对象,但是无法通过现有的对象作为模板来生成新的对象类型,继而产生新的对象,也就是说,基于对象没有继承的特点.而面向对象有继承,而多态则是建立在继承的基

疯狂Java学习笔记(87)-----------十篇必读的Java文章

1.Brian Goetz:"管理工作:发人深省的部分" 这其实不是一篇博文,而是Brian Goetz对于甲骨文公司Java的管理的一个非常有趣的讨论的记录.在 以前我们将Java语言与Scala或者Ceylon相比较的时候,对其1-2个特性一直稍微有些意见. 对于为什么Java尽快变得和其他语言一样"时髦"不是一个好主意,Brian提出了很好的观点.每一个Java开发者都应有所了解(大约一个小时). Youtube链接. 2.Aleksey Shipilёv:(

疯狂Java学习笔记(88)-----------值得拥有的10本书

Java是时下最流行的编程语言之一.市面上也出现了适合初学者的大量书籍.但是对于那些在Java编程上淫浸多时的开发人员而言,这些书的内容未免显得过于简单和冗余了.那些适合初学者的书籍看着真想打瞌睡,有木有.想找高级点的Java书籍吧,又不知道哪些适合自己. 别急,雪中送炭的来了:下面我将分享的书单绝对值得拥有.ps,我也尽力避免列出为特定软件或框架或认证的Java书,因为我觉得那不是纯Java书. 1.<Java in a Nutshell>(Java技术手册) 与其说是必读书籍,还不说是参考

疯狂Java学习笔记(89)-----------Java习惯用法总结

在Java编程中,有些知识 并不能仅通过语言规范或者标准API文档就能学到的.在本文中,我会尽量收集一些最常用的习惯用法,特别是很难猜到的用法.(Joshua Bloch的<Effective Java>对这个话题给出了更详尽的论述,可以从这本书里学习更多的用法.) 我把本文的所有代码都放在公共场所里.你可以根据自己的喜好去复制和修改任意的代码片段,不需要任何的凭证. 目录 实现: equals() hashCode() compareTo() clone() 应用: StringBuilde