jdk8之CompletableFuture与CompletionService

  JDK 8的CompletionService相对于之前版本的Future而言,其优势是能够尽可能快的得到执行完成的任务。例如有4个并发任务要执行,正常情况下通过Future.get()获取,通常只能按照提交的顺序获得结果,如果最后提交的最先完成的话,总执行时间会长很多。而通过CompletionService能够降低总执行时间,如下所示:

package com.hundsun.ta.base.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author zjhua
 * @description
 * @date 2020/1/28 21:07
 */
public class CompletionServiceTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        testFuture();
        testCompletionService();
    }

    //结果的输出和线程的放入顺序 有关(如果前面的没完成,就算后面的哪个完成了也得等到你的牌号才能输出!),so阻塞耗时
    public static void testFuture() throws InterruptedException, ExecutionException {
        long beg = System.currentTimeMillis();
        System.out.println("testFuture()开始执行:" + beg);
        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<String>> result = new ArrayList<Future<String>>();
        for (int i = 5; i > 0; i--) {
            Future<String> submit = executor.submit(new Task(i));
            result.add(submit);
        }
        executor.shutdown();
        for (int i = 0; i < 5; i++) {//一个一个等待返回结果
            Thread.sleep(500);
            System.out.println("线程" + i + "执行完成:" + result.get(i).get());
        }
        System.out.println("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis()-beg));
    }

    //结果的输出和线程的放入顺序 无关(谁完成了谁就先输出!主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序),so很大大缩短等待时间
    private static void testCompletionService() throws InterruptedException, ExecutionException {
        long beg = System.currentTimeMillis();
        System.out.println("testFuture()开始执行:" + beg);
        ExecutorService executor = Executors.newCachedThreadPool();
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        for (int i = 5; i > 0; i--) {
            completionService.submit(new Task(i));
        }
        executor.shutdown();
        for (int i = 0; i < 5; i++) {
            // 检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
            Future<String> future = completionService.take(); //这一行没有完成的任务就阻塞
            Thread.sleep(500);
            System.out.println("线程" + i + "执行完成:" + future.get());   // 这一行在这里不会阻塞,引入放入队列中的都是已经完成的任务
        }
        System.out.println("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg));
    }

    private static class Task implements Callable<String> {

        private volatile int i;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(i*500);
            return "任务 : " + i;
        }

    }
}
// 执行结果
testFuture()开始执行:1580217876088
线程0执行完成:任务 : 5
线程1执行完成:任务 : 4
线程2执行完成:任务 : 3
线程3执行完成:任务 : 2
线程4执行完成:任务 : 1
testFuture()执行完成:1580217880596,4508
testFuture()开始执行:1580217880596
线程0执行完成:任务 : 1
线程1执行完成:任务 : 2
线程2执行完成:任务 : 3
线程3执行完成:任务 : 4
线程4执行完成:任务 : 5
testFuture()执行完成:1580217883605,3009

使用传统的Future,需要执行4.5秒,使用CompleteService,则只需要3秒。但是如果子线程执行完成后不需要执行其他任务,则意义不是很大。

除了上述场景外,CompleteService还适合于N选1的场景,例如同时从两个渠道查询数据,返回任何一个可用的即可,从Future就实现不了。

CompletionService的定义如下:

其实现也比较简单,利用了ThreadPoolExecutor。

看完CompleteService,再来看CompleteFuture。它实现了Future接口和CompletionStage接口(他代表某个异步或同步计算的阶段,也就是计算流水线的一个节点,这样多个CompletionStage可以作为和过滤器一样链式执行,一个计算单元完成后出发下一个计算单元),和CompleteService的区别在于CompleteFuture知道当前完成的是谁,并采用编程式回调提高代码可读性,CompleteService只知道哪个最快完成了,具体是谁需要应用自己去关联上下文。同时在编程模式上,很大程度上利用了JDK 8的Lambda表达式,这样一个完整服务的多个步骤就能够和同步的的写法一样自然,不用为了实现异步处理而将逻辑合并为一个超大的方法。在并行处理中,如果每个分片的处理时间相差比较大,例如有些1分钟,有些3分钟,有些10秒钟,这样将每个服务的粒度细分为很多个子步骤,每个服务的子步骤通过CompleteFuture串联起来,整体的完成时间就能够下降,每个分片的处理完成时间也将趋于接近。同时在异常的处理上,CompleteFuture也要友好的多。

下面来看一个例子:

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
static void thenApplyAsyncWithExecutorExample() {    // 简单的异步执行
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
        assertFalse(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    }, executor);
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

异常处理:

static void completeExceptionallyExample() {
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));  // 模拟抛出异常
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}

链式调用:

public void completableFutureApplyAsync() {
 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
 ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
 CompletableFuture<Integer> completableFuture =
 CompletableFuture
      .supplyAsync(this::findAccountNumber,newFixedThreadPool)//will run on thread obtain from newFixedThreadPool
      .thenApplyAsync(this::calculateBalance,newSingleThreadScheduledExecutor) //will run on thread obtain from newSingleThreadScheduledExecutor
      .thenApplyAsync(this::notifyBalance);//will run on thread obtain from common pool
   Integer balance = completableFuture.join();
    assertEquals(Integer.valueOf(balance), Integer.valueOf(100));
    }

  就实际应用而言,CompletableFuture的作用更加有价值的地方在于其他的一些方法,比如allOf、anyOf、xxxToEither等需要多对一的场景,他们可以大大简化代码。

参考:

https://dzone.com/articles/20-examples-of-using-javas-completablefuture

原文地址:https://www.cnblogs.com/zhjh256/p/11829397.html

时间: 2024-11-08 19:58:04

jdk8之CompletableFuture与CompletionService的相关文章

浅谈Java Future接口

Java项目编程中,为了充分利用计算机CPU资源,一般开启多个线程来执行异步任务.但不管是继承Thread类还是实现Runnable接口,都无法获取任务执行的结果.JDK 5中引入了Callable和Future,通过它们执行异步任务可以获取执行结果.FutureTask分析 JDK 5中获取任务执行的结果主要是通过FutureTask类实现的.FutureTask实现了RunnableFuture的接口,它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值

jdk8新特性

jAVA8 十大新特性 投稿:junjie 字体:[增加 减小] 类型:转载 本教程将Java8的新特新逐一列出,并将使用简单的代码示例来指导你如何使用默认接口方法,lambda表达式,方法引用以及多重Annotation,之后你将会学到最新的API上的改进,比如流,函数式接口,Map以及全新的日期API "Java is still not dead-and people are starting to figure that out." 本教程将用带注释的简单代码来描述新特性,你将

JDK8对并发的新支持

原文:JDK8对并发的新支持 1. LongAdder 和AtomicLong类似的使用方式,但是性能比AtomicLong更好. LongAdder与AtomicLong都是使用了原子操作来提高性能.但是LongAdder在AtomicLong的基础上进行了热点分离,热点分离类似于有锁操作中的减小锁粒度,将一个锁分离成若干个锁来提高性能.在无锁中,也可以用类似的方式来增加CAS的成功率,从而提高性能. LongAdder原理图: AtomicLong的实现方式是内部有个value 变量,当多线

高并发下的批量处理与单个处理(利用jdk8新特性处理,提高性能)

1.技术选型: SpringBoot 2.案例: 实体类: package com.zhangwl.complicatedemo.pojo; import java.sql.Timestamp; /** * @ClassName Goods * @Description * @Author zhangwl * @Date 2019/10/3 0:54 * @Version 1.0 **/public class Goods { private String goodsName; private

【转】ConcurrentHashMap完全解析(JDK6/7、JDK8)

转自http://my.oschina.net/hosee/blog/675884 并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求(这点好像CAP理论啊 O(∩_∩)O).ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS

解决win7 安装完jdk7后,再安装jdk8出现的问题 has value &#39;1.8&#39;, but &#39;1.7&#39; is required.

http://blog.csdn.net/qiyueqinglian/article/details/46605759 电脑装了jdk8,JAVA_HOME也是设置的8. 不删除8变回7. 改了JAVA_HOME,并且path值里的C:\ProgramData\Oracle\Java\javapath也删了 运行java -version,报错 Error: Registry key ‘Software\JavaSoft\Java Runtime Environment’\CurrentVers

CompletableFuture

若你的意图是并发,而非并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么其实真正想做的避免因为等待远程服务的返回,或对数据库的查询而阻塞线程的执行,浪费计算资源. Future接口在Java 5中引入,设计初衷是对将来某个时刻会发生的结果进行建模.它建模了一种异步计算,返回一个执行计算结果的引用.使用Future只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService.可以调用get方

JDK8新特性 Lambda表达式

一.接口的默认方法二.Lambda 表达式三.函数式接口四.方法与构造函数引用五.Lambda 作用域六.访问局部变量七.访问对象字段与静态变量八.访问接口的默认方法九.Date API十.Annotation 注解:支持多重注解 一.接口的默认方法 Java8允许我们给接口添加一个非抽象的方法实现,只需要使用 default关键字即可,这个特征又叫做扩展方法,示例如下: [java] view plain copy public interface Formula { double calcu

Linux下安装jdk8步骤详述

作为Java开发人员,在Linux下安装一些开发工具是必备技能,本文以安装jdk为例,详细记录了每一步的操作命令,以供参考. 0.下载jdk8 登录网址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html选择对应jdk版本下载.(可在Windows下下载完成后,通过文件夹共享到Linux上) 1. 登录Linux,切换到root用户 su root 获取root用户权限,当前工作目