Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流

对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下:

除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下:

List<Integer> list = new ArrayList<Integer>(){{
    add(1);
    add(2);
    add(3);
}};

long count = list.stream().filter(num -> num % 2 == 0).count();
System.out.println(count);  // 1

其示意图如下:

上面提供的例子,比如filter,其参数为一个lambda表达式,所以Stream其实是用函数式编程方式在集合类上进行复杂操作的工具。

2 Stream流操作与Spark RDD算子

其实有Spark经验的人开始使用Stream流操作时,会有似曾相识的感觉,好像一切都那么熟悉。

参考Spark RDD算子介绍的文章:《Spark RDD算子实战》https://blog.51cto.com/xpleaf/2108481

下面从操作对象(名词)和对象操作(动词)两个角度来简单对比一下。

2.1 操作对象

Spark RDD算子的操作对象是RDD,中文意思是弹性分布式数据集,对用户而言,它就是类似集合一样的对象,里面存的是数据,只是底层它的数据可能分布于各个节点的各个partition,但不管怎样,其本质还是数据集。

Stream流操作的操作对象是集合,集合本质也是一种数据集,只是相比RDD,它是单机的。

2.2 对象操作

Spark RDD算子有两种类型,分别是Transformation算子和Action算子,前者是延迟计算的,它仅仅记住了数据的逻辑操作,并没有真正执行,后者是真正触发Transformation算子的计算。

Stream流操作也有两种类型,分别是惰性求值和及早求值(个人觉得这翻译不好),前者也只是记录了惰性求值的逻辑操作,后者才是真正触发操作。

可以看到其两者是非常相似的,一个是对分布式数据进行的各种操作,一个是单机数据进行的各种操作,把计算分为延迟计算和触发计算两种,好处是显而易见的:当对数据集进行多次逻辑操作时,有可能迭代只需要一次就可能完成,这样真正触发计算时,一次迭代带来的性能提升是显著的,比如对于过滤和计算这两个操作(前面计算偶数的操作),在一次迭代中就能够完成。

当然,不仅类型相似,其本身提供的操作的名称而言,都是相似的,有些东西真的是通用的。

3 常用Stream流操作

每个操作都用一个通俗易懂的例子来进行说明。

3.1 及早求值操作

3.1.1 collect(toList())

其作用是将Stream流中的元素收集起来,形成List、Set或Map等。

List<Integer> list = Stream.of(1, 2, 3).collect(Collectors.toList());

System.out.println(list);   // [1, 2, 3]

1.Stream.of()方法用于方便地生成Stream流;

2.Collectors还有toSet()、toMap()等方法,详见其API。

3.1.2 forEach(Consumer)

对集合中的每个元素进行操作,其参数是Consumer<T>函数接口。

Consumer<Integer> printNum = System.out::print;
Stream.of(1, 2, 3).forEach(printNum);   // 123

System.out::print表示使用System.out类中的print方法,相当于lambda表达式:element -> System.out.print(element);

上面的例子也可以一步到位:

Stream.of(1, 2, 3).forEach(System.out::print);  // 123

3.1.3 max和min

其参数为Comparator对象,返回一个Optional对象,Optional说明其结果可能有,也可能没有(比如对空值的Stream流操作时)。

// 计算数值流中的最大值
Optional<Integer> maxOptional = Stream.of(1, 2, 3).max(Comparator.comparing(num -> num));
System.out.println(maxOptional.get());  // 3

// 找出字符串流中长度最小的字符串
Optional<String> minOptional = Stream.of("a", "ab", "abc").min(Comparator.comparing(String::length));
System.out.println(minOptional.get());  // a

另外,其确实是及早求值操作,可以验证一下:

Stream.of(1, 2, 3).max(Comparator.comparing(num -> {
    System.out.println(num);
    return num;
}));

输出:

1
2
2
3

3.2 惰性求值操作

3.2.1 map

其参数为Function&lt;T,R&gt;,用于将Stream流中的值转换为另外一种流。

// 将字母转换为大写
Stream.of("a", "b", "hello")
    .map(String::toUpperCase)
    .forEach(element -> System.out.print(element + " "));  // A B HELLO 

3.2.2 filter

其参数为Predicate&lt;T&gt;,过滤Stream流中的元素。

// 找出偶数
List<Integer> list = Stream.of(1, 2, 3).filter(num -> num % 2 == 0).collect(Collectors.toList());

System.out.println(list);   // [2]

3.2.3 flatMap

其参数为Function&lt;T,R&gt;,只是此时R限定为Stream,将Stream流中的值转换为更多的流。

// 找出字符串中的单词
List<String> list = Stream.of("hello you", "hello me")
    .flatMap(line -> Arrays.stream(line.split(" "))).collect(Collectors.toList());

System.out.println(list);   // [hello, you, hello, me]

是不是感觉跟Spark的wordcount例子有点像。

3.2.4 reduce

其参数为BinaryOperator&lt;T&gt;,返回一个Optional对象,Optional说明其结果可能有,也可能没有(比如对空值的Stream流操作时,并且没有指定初始值),用于归约操作。

// 求和
Integer res = Stream.of(1, 2, 3).reduce((acc, element) -> acc + element).get();

// 指定初始值6后,Stream的reduce操作结果肯定有值的,因此其返回的不是Optional,而直接是6所属的类型,即Integer
Integer res2 = Stream.of(1, 2, 3).reduce(6, (acc, element) -> acc + element);

System.out.println(String.format("res: %s, res2: %s", res, res2));  // res: 6, res2: 12

4 参考

Java 8 Lambdas,Richard Warburton著(O’Reilly,2014)》

原文地址:https://blog.51cto.com/xpleaf/2372042

时间: 2024-10-05 13:59:50

Java8函数式编程(二):类比Spark RDD算子的Stream流操作的相关文章

Spark RDD算子实战

[TOC] Spark算子概述 RDD:弹性分布式数据集,是一种特殊集合.支持多种来源.有容错机制.可以被缓存.支持并行操作,一个RDD代表多个分区里的数据集. RDD有两种操作算子: Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作 Action(执行):触发Spark作业的运行,真正触发转换算子的计算 需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都

Java8函数式编程-包教包会系列(一)

Oracle 公司于 2014 年 3 月 18 日发布 Java 8 ,距离现在已经有很长一段时间了,Java10也在2018年 3月 21日正式发布,并且Oracle宣布Java 语言将每隔 6 个月提供一次更新.Java语言不断更新迭代,然而很多Java程序员缺没有跟上技术更新的步伐!所以写下这篇系列文章,希望可以帮助大家入门Java8!话不多说,开始发车! 前言 宝刀未老的Java 1995年Sun公司推出的Java语言,从第一个版本诞生到现在已经有二十多年的了.时间若白驹过隙,转瞬即逝

漫漫人生路,学点Jakarta基础-Java8函数式编程

接口默认方法 Java8版本以后新增了接口的默认方法,不仅仅只能包含抽象方法,接口也可以包含若干个实例方法.在接口内定义实例方法(但是注意需要使用default关键字) 在此定义的方法并非抽象方法,而是具有特定逻辑的实例方法. 举例说明:定义接口Animal,其中包含默认方法eat(). /** * Created by zjc on 2018/4/9. */ public interface Animal { void call(); default void eat() { System.o

java8函数式编程实例

什么是函数式编程 函数式编程是java8的一大特色,也就是将函数作为一个参数传递给指定方法.别人传的要么是基本数据类型,要么就是地址引用 ,我们要穿一个“动作”. Stream 说到函数式编程,就不得不提及Stream,Stream跟我们熟知的io流可不是同一个东西,泛指可以顺序执行或者并行执行的元素序列,主要是针对集合,可以将多个函数通过“.”串起来执行,其特点如下: stream不会存储数据,只是将集合流化,比如说 声明一个stream之后,往集合里面扔东西,stream可以取到新扔到集合里

Java8函数式编程的宏观总结

1.java8优势通过将行为进行抽象,java8提供了批量处理数据的并行类库,使得代码可以在多核CPU上高效运行. 2.函数式编程的核心使用不可变值和函数,函数对一个值进行处理,映射成另一个值. 3.Lambda表达式一种紧凑的.传递行为的方式. 4. 静态类型语言java8依旧是静态类型语言,javac依旧会在编译时,对参数类型进行检查. 5.函数接口只有一个抽象方法的接口,用作lamdba表达式的类型. 6.对核心类库的改进-Stream实现机制 整个过程:一系列惰性求值方法+最后一个及早求

Java8函数式编程 (一) 数据流和lambda表达式

JDK 1.8中引入了函数式编程(functional programming,FP),如果您已习惯OOP,一定会感到困惑:什么是函数式编程?这样的编程模式有什么好处? 本文将通过简单的实例令读者对函数式编程有一个大体的了解. 我们知道OOP是以类为基础的,程序中必须首先抽象和定义class.那么FP创建的基础是什么?或者说在Java 8中,至少需要了解什么知识点才能实现基本的函数式编程呢? 本文将首先介绍在Java 8中使用FP所需的基本知识点: Lambda表达式 数据流 基本实例 Map<

swift之函数式编程(二)------- Thinking Functionally

本文的主要内容来自<Functional Programming in Swift>这本书,有点所谓的观后总结 在本书的Introduction章中: we will try to focus on some of the qualities that we believe well-designed functional programs in Swift should exhibit: 1. Modulatity[模块化] 2. A Careful Treatment of Mutable

[三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路

Stream的概念定义   官方文档是永远的圣经~ 表格内容来自https://docs.oracle.com/javase/8/docs/api/   Package java.util.stream  一节部分原文内容的翻译 int sum = widgets.stream() .filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum(); 流操作被划分为中间和终端操作,并组合成流管道. 一条Stream管道由

Java8函数式编程(一):Lambda表达式类型与常用函数接口

[TOC] 1 前言 最近在看一些开源项目的源码,函数式编程风格的代码无处不在,所以得要好好学一下了. 2 Lambda表达式类型 无参数: Runnable noArguments = () -> System.out.println("Hello World!"); noArguments.run(); 一个参数: UnaryOperator<Boolean> oneArgument = x -> !x; System.out.println(oneArgu