流式计算(一)-Java8Stream

大约各位看官君多少也听说了Storm/Spark/Flink,这些都是大数据流式处理框架。如果一条手机组装流水线上不同的人做不同的事,有的装电池,有的装屏幕,直到最后完成,这就是典型的流式处理。如果手机组装是先全部装完电池,再交给装屏幕的组,直到完成,这就是旧式的集合式处理。今天,就来先说说JDK8中的流,虽然不是很个特新鲜的话题,但是一个很好的开始,因为——思想往往比细节重要!

准备

Idea2019.03/Gradle5.6.2/JDK11.0.4/Lambda

难度:新手--战士--老兵--大师

目标

1.Lambda表达式使用

2.流的筛选/切片/查找/匹配/映射/归约操作

步骤

为了遇见各种问题,同时保持时效性,我尽量使用最新的软件版本。代码地址:其中的day22,https://github.com/xiexiaobiao/dubbo-project.git

1.先来两个概念:

  • 流(Stream):一个元素序列。位于包java.util.stream.Stream,注意这个序列是可以串行或并行处理的。有多种方式建立流,最常见的是从集合(Collection)对象获取,有序集合如List的流有序,Set的流则无序。
  • Lambda表达式:流式处理的绝佳搭档!什么是Lambda表达式?略。哪里可以用Lambda表达式?需要实现一个函数式接口(只定义了一个抽象函数的接口)的地方就可以使用Lambda表达式,代替匿名类方式。源代码中com.biao.lambda包里,我写了一个简单的Lambda实例,供参考。

2.流式处理特点:

  • 流水线:流操作可返回一个流,多个操作从而可形成一个链,
  • 内部迭代:使用Iterator/forEach显式迭代器为外部迭代,流的迭代是流内部完成的,只需声明,是内部迭代,
  • 一次使用:每个流只能消费一次,不能结束后又从头开始!

3.流的一般使用:

  • 建立流:创建一个Stream对象,如从一个数据源来执行一个查询;
  • 操作流:一个包含了各种操作的操作链;
  • 结束流:一个终端操作,形成一个结果集或值

4.来个例子,假设这里有个com.biao.Fruit类:

@Data
public class Fruit {
    private String name;
    private String origin;
    private Integer price;
}

我们要从一堆水果里,找前4种产自中国的名称是字母A开头的水果。这还不是小菜?几次使用Iterator或者forEach循环就实现了!

如果使用流处理,大致处理流程图示则如下,代码后面再聊:

5.流的主要操作:筛选/切片/查找/匹配/映射/归约

下面将一一道来:

筛选/切片:使用filter/skip/limit/distinct方法过滤。filter接收一个Predicate函数表达式,方法签名是T --> boolean,我们来实现上面的图示逻辑,至于JDK7的实现,看官君可以想一想,对比一下,com.biao.Application1代码实现片段:

// 创建流
        Stream<Fruit> fruitStream = fruitList.stream();
        // 过滤
        Stream<Fruit> filteredStream = fruitStream.filter(d -> "China".equals(d.getOrigin()));
        // 去掉重复元素
        Stream<Fruit> distinctStream = filteredStream.distinct();
        // 打印流中元素,forEach是终端操作,如果这里使用了,则collect方法无法使用,即一个流只能消费一次
        // distinctStream.forEach(System.out::println);
        // 跳过1个元素,
        Stream<Fruit> skippedStream = distinctStream.skip(1);
        // 切片,参数为maxSize
        Stream<Fruit> limitStream = skippedStream.limit(4);
        // 结束,collect方法是收集器,如果这里使用了,则forEach无法使用,即一个流只能有一个终端操作
        List<Fruit> newList = limitStream.collect(Collectors.toList());
        // 打印结果,lambda方式
        newList.forEach(System.out::println);

        // 链式操作,和上面效果一样,一气呵成,真爽!
        List<Fruit> newList2 = fruitList.stream()
                .filter(d -> "China".equals(d.getOrigin()))
                .distinct()
                .skip(1)
                .limit(4)
                .collect(Collectors.toList());
        // 打印结果集
        newList2.forEach(System.out::println);

以上代码核心点:

  • 尽量使用链式语法配合Lambda,简洁至极!
  • 一个流只能有一个终端操作!即一个流只能被消费一次!
  • filter方法的参数表达式可以进行逻辑复合运算,如and/not/or,

映射:对流中的每个元素应用映射函数,变换成新的对象。使用map方法,接受一个Function类型,函数签名是 T—> R,比如对以上Fruit流提取水果的名称,并过滤字母A开头的水果,com.biao.Application2代码实现片段:

// 创建流
Stream<Fruit> fruitStream = fruitList.stream();
//转换,变为String流
Stream<String> stringStream = fruitStream.map(Fruit::getName);
//过滤,名称以A开头
Stream<String> filteredStream = stringStream.filter(str -> "A".equals(String.valueOf(str.charAt(0))));
//终端操作,set自动去重复
Set<String> stringSet = filteredStream.collect(Collectors.toSet());
//打印结果集
stringSet.forEach(System.out::println);

//链式语法实现,请君想象下JDK7的实现,
fruitList.stream()
      .map(Fruit::getName)
      .filter(str -> "A".equals(str.substring(0,1)))
      .collect(Collectors.toSet())
      .forEach(System.out::println);

我还写了个map映射+flatMap扁平化流例子,com.biao.Application3代码片段:

 /**映射示例2:map映射+flatMap扁平化流*/
String[] arraysOfWords = {"Apple","Banana","Nuts","Olive"};
// 使用Arrays的静态方法创建流
Stream<String> stringStream = Arrays.stream(arraysOfWords);
// 对每个word映射为String[]
stringStream.map(word -> word.split(""))
      // flatMap扁平化流,将生成的流组合成一个流
      // 如果使用map(Arrays::stream),则生成由流元素组成的流
      .flatMap(Arrays::stream)
      // 去掉重复
      .distinct()
      // 终端操作,collect方法是收集器
      .collect(Collectors.toList())
      .forEach(System.out::println);

流的扁平化,一言以蔽之,flatmap方法让你把一个流中的每个值都换成一个的流(即流中的元素也是流),然后把所有的流连接起来成为一个流。

查找/匹配:StreamAPI通过allMatch,anyMatch,noneMatch,findFirst,findAny方法找到符合的元素,com.biao.Application4代码实现片段:

   // 注意这里每个都要重建一个流
        // 是否全部价格大于50
        boolean almach = fruitList.stream().allMatch(fruit -> fruit.getPrice() > 50);
        System.out.println(almach);
        // 是否至少有一种产自America
        boolean anyMatch = fruitList.stream().anyMatch(fruit -> "America".equals(fruit.getOrigin()));
        System.out.println(anyMatch);
        // 找出流中第3个,
        Optional<Fruit> thirdOne = fruitList.stream().skip(2).findFirst();
        // 存在则打印,防止NPE
        thirdOne.ifPresent(System.out::println);
        // 找出流中任意一个,,
        Optional<Fruit> anyOne = fruitList.stream().findAny();
        // ifPresent,值存在则执行操作,否则 do nothing!
        anyOne.ifPresent(System.out::println);

以上代码核心点:

  • 这里每个查找/匹配都要重建一个流,
  • 找可能的第3个元素,skip(2).findFirst(),返回Optional T 类,解决返回null的NPE问题,这样即使不存在第3个元素,返回对象仍然可以继续做计算,
  • findAny时,流水线将在后台进行优化使其只需走一遍,并在利用短路找到结果时立即结束

归约:使用reduce对流中元素累积计算,最后得到一个值。比如找到上面水果中价格最高的,计算出产自Japan的水果的总价格,com.biao.Application5代码实现片段:

// 注意这里每个都要重建一个流
        int totalPrice = fruitList.stream()
                .filter(fruit -> "Japan".equals(fruit.getOrigin()))
                //映射转换为Integer流
                .map(Fruit::getPrice)
                //reduce归约计算
                // 也可使用reduce(0,(a,b) -> a+b);
                .reduce(0,Integer::sum);
        System.out.println(totalPrice);

        /** reduce无初始值的归约计算 */
        Optional<Integer> totalPrice2 = fruitList.stream()
                .map(Fruit::getPrice)
                .reduce((a,b) -> a+b);
        // ifPresent,值存在则执行操作,否则 do nothing!
        totalPrice2.ifPresent(System.out::println);

        /** reduce计算最大*/
        Optional<Integer> maxPrice = fruitList.stream()
                .map(Fruit::getPrice)
                // 归约计算最大值:
                // 这里也可以使用reduce((x,y) -> x>y?x:y)
                .reduce(Integer::max);
        // ifPresent,值存在则执行操作,否则 do nothing!
        maxPrice.ifPresent(System.out::println);

        /** reduce计算最小值*/
        Optional<Integer> minPrice = fruitList.stream()
                .map(Fruit::getPrice)
                // 归约计算最小值:也可以使用reduce((x,y) -> x<y?x:y)
                .reduce(Integer::min);
        // ifPresent,值存在则执行操作,否则 do nothing!
        minPrice.ifPresent(System.out::println);

以上代码核心点:

  • reduce的函数参数有几种重载,返回的值不一样,无初始值的返回Optional对象,
  • Optional. ifPresent方法,Optional对象值存在则执行操作,否则 do nothing,
  • map和reduce的连接通常称为map-reduce模式,源于google的搜索模式,

6.看完了上面的各种流操作,看官君也许会说,似乎也没啥大不了的啊,顶多是少写了几行代码,那请用集合式实现下相同逻辑!另请君回忆下,是否有过多层嵌套或者N多分支的if-elseif-else/switch-case场景?那现在就请试试流式写法!事实上我这里只是举了几个常规的应用例子而已,抓住下看官君的兴趣,StreamAPI还有其他强大的功能:

  • 无限流、范围流:能直接创建无限流,再局部处理,
  • Collect收集器的分区分组:将最终结果集按条件分区分组,类比SQL的groupBy,
  • 流的parallel/sequential并行计算和顺序计算:声明式并行计算,无需为锁烦恼,
  • 分支/合并ForkJoin框架的递归计算:多线程方式处理,还可自定义线程池参数,
  • 同步/异步执行:使用CompletableFuture类实现更高效的异步处理,

总结:

1.流不仅仅是将外部迭代变为内部迭代,更是一种编程思想的转变,结合函数抽象,行为参数化,将函数作为参数,提升为与值一样的地位,威力巨大,这就是生产力。

2.流计算能大大简化编程,使用声明式语法,配合Lambda,写起代码根本停不下来!

3.后期会在看看其他Storm/Spark/Flink流式计算框架,发现点新鲜货。

全文结束!



推荐阅读:

原文地址:https://www.cnblogs.com/xxbiao/p/12048946.html

时间: 2024-10-27 18:53:16

流式计算(一)-Java8Stream的相关文章

流式计算(二)-Kafka Stream

前面说了Java8的流,这里还说流处理,既然是流,比如水流车流,肯定得有流的源头,源可以有多种,可以自建,也可以从应用端获取,今天就拿非常经典的Kafka做源头来说事,比如要来一套应用日志实时分析框架,或者是高并发实时流处理框架,正是Kafka的拿手好戏. 环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1 难度:新手--战

流式计算(五)-Flink核心概念

一手资料,完全来自官网,直接参考英文过来的,并加了一些自己的理解,希望能让看官君了解点什么,足矣. 环境:Flink1.9.1 难度:新手--战士--老兵--大师 目标: 理解Flink的计算模型 认识各重要组件 说明: 本篇作为前两篇的补充内容,算是理论篇 步骤: 01-Flink编程模型 Flink的流计算整体来看都是按照Source -> Transformation -> Sink三步走,即获取流源 -> 进行转换 -> 汇聚(Sink),但“转换 (Transformat

流式计算-Jstorm提交Topology过程(下)

紧接上篇流式计算-Jstorm提交Topology过程(上), 5.上篇任务已经ServiceHandler.submitTopologyWithOpts()方法,在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中,具体代码如下: TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTo

大数据技术(1)流式计算与Storm

2011年在海量数据处理领域,Hadoop是人们津津乐道的技术,Hadoop不仅可以用来存储海量数据,还以用来计算海量数据.因为其高吞吐.高可靠等特点,很多互联网公司都已经使用Hadoop来构建数据仓库,高频使用并促进了Hadoop生态圈的各项技术的发展.一般来讲,根据业务需求,数据的处理可以分为离线处理和实时处理,在离线处理方面Hadoop提供了很好的解决方案,但是针对海量数据的实时处理却一直没有比较好的解决方案. 就在人们翘首以待的时间节点,storm横空出世,与生俱来的分布式.高可靠.高吞

流式计算框架-STORM简介

在当前的数据分析领域,对实时数据的计算需求越来越强烈,在此领域,出现了各类计算框架,如:Storm.S4等.目前本土公司对这些流式计算框架的应用也比较广泛,但苦于相关文档英文居多,缺少成系列且与官方相对应的中文手册.本系列试图从官方文档翻译入手,给大家呈现较为完备的中文资料,同时也是对自身知识的总结沉淀. 在这个系列博客中,我们选择了twitter的Storm框架,原因很简单,因为本人长期使用的就是该框架,咱们先从简介开始. Apache Storm是一个免费.开源.分布式的实时计算系统.相对于

大数据读书笔记(2)-流式计算

早期和当前的"流式计算"系统分别称为"连续查询处理类"和"可扩展数据流平台类"计算系统. 流式计算系统的特点: 1)低延迟 2)极佳的系统容错性 3)极强的系统扩展能力 4)灵活强大的应用逻辑表达能力 目前典型的流式计算系统: S4,storm,millwheel,samza,d-stream,hadoop online,mupd8等. 其中storm和millwheel是各方面比较突出的. 流式计算系统架构: 常见的流式计算系统架构分为两种:主

流式计算形态下的大数据分析

1 介 绍 1.1 流式计算介绍 流式大数据计算主要有以下特征: 1)实时性.流式大数据不仅是实时产生的,也是要求实时给出反馈结果.系统要有快速响应能力,在短时间内体现出数据的价值,超过有效时间后数据的价值就会迅速降低. 2)突发性.数据的流入速率和顺序并不确定,甚至会有较大的差异.这要求系统要有较高的吞吐量,能快速处理大数据流量. 3)易失性.由于数据量的巨大和其价值随时间推移的降低,大部分数据并不会持久保存下来,而是在到达后就立刻被使用并丢弃.系统对这些数据有且仅有一次计算机会. 4)无限性

什么是流式计算?

一.流式计算的背景 在日常生活中,我们通常会先把数据存储在一张表中,然后再进行加工.分析,这里就涉及到一个时效性的问题.如果我们处理以年.月为单位的级别的数据,那么多数据的实时性要求并不高:但如果我们处理的是以天.小时,甚至分钟为单位的数据,那么对数据的时效性要求就比较高.在第二种场景下,如果我们仍旧采用传统的数据处理方式,统一收集数据,存储到数据库中,之后在进行分析,就可能无法满足时效性的要求. 二.流式计算与批量计算 大数据的计算模式主要分为批量计算(batch computing).流式计

Strom流式计算

序言 主要学习方向 Kafka 分布式消息系统 Redis 缓存数据库 Storm 流式计算 1.Storm 的基本概念 2.Storm 的应用场景 3.Storm 和Hadoop的对比 4.Storm 集群的安装的linux环境准备 5.zookeeper集群搭建 6.Storm 集群搭建 7.Storm 配置文件配置项讲解 8.集群搭建常见问题解决 9.Storm 常用组件和编程 API:Topology. Spout.Bolt 10.Storm分组策略(stream groupings)