使用Java lambda表达式实现Flink WordCount

本篇我们将使用Java语言来实现Flink的单词统计。

代码开发

环境准备

导入Flink 1.9 pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>

构建Flink流处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定义source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒发送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

单词计算

// 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

参考代码

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 构建Flink流式初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 自定义source - 每秒发送一行文本
        DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒发送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

        // 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

        env.execute("app");
    }
}

Flink对Java Lambda表达式支持情况

Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。

我们来看下上述的这段代码:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

@Public@FunctionalInterfacepublic interface FlatMapFunction<T, O> extends Function, Serializable {

/**    * The core method of the FlatMapFunction. Takes an element from the input data set and transforms    * it into zero, one, or more elements.    *    * @param value The input value.    * @param out The collector for returning result values.    *    * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation    *                   to fail and may trigger recovery.    */   void flatMap(T value, Collector<O> out) throws Exception;}

我们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

void flatMap(T value, Collector out)

这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of ‘Collector‘ are missing.
    In many cases lambda methods don‘t provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction‘ interface.
    Otherwise the type has to be specified explicitly using type information.

这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

我们再看一段代码:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

为什么map后面也需要指定类型呢?

因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

原文地址:https://www.cnblogs.com/ilovezihan/p/12245067.html

时间: 2024-08-28 23:58:18

使用Java lambda表达式实现Flink WordCount的相关文章

Java Lambda表达式入门[转]

原文链接: Start Using Java Lambda Expressions http://blog.csdn.net/renfufei/article/details/24600507 下载示例程序 Examples.zip .原文日期: 2014年4月16日 翻译日期: 2014年4月27日翻译人员: 铁锚简介(译者注:虽然看着很先进,其实Lambda表达式的本质只是一个"语法糖",由编译器推断并帮你转换包装为常规的代码,因此你可以使用更少的代码来实现同样的功能.本人建议不要

Java Lambda表达式入门

原文链接: Start Using Java Lambda Expressions 下载示例程序 Examples.zip . 原文日期: 2014年4月26日 翻译日期: 2014年4月27日 翻译人员: 铁锚 简介 (译者注:虽然看着很先进,其实Lambda表达式的本质只是一个"语法糖",由编译器推断并帮你转换包装为常规的代码,因此你可以使用更少的代码来实现同样的功能.本人建议不要乱用,因为这就和某些很高级的黑客写的代码一样,简洁,难懂,难以调试,维护人员想骂娘.) Lambda表

Java Lambda表达式forEach无法跳出循环的解决思路

Java Lambda表达式forEach无法跳出循环的解决思路 如果你使用过forEach方法来遍历集合,你会发现在lambda表达式中的return并不会终止循环,这是由于lambda的底层实现导致的,看下面的例子: public static void main(String[] args) { List<String> list = Lists.newArrayList(); list.add("a"); list.add("b"); list.

Java Lambda 表达式的奇幻之旅

JDK 8 对 Lambda 函数编程的支持,浅的来说无非是引入了一些新的语法结构,是继JDK5 引入的Generics后又一项对大家编码方式的一种革新,如果你不跟上的话,恐怕过段时间,你会认为Java代码成了火星语.深的来说,Java是在语言级进一步支持多核CPU的环境下的并行处理,这在Stream API 中可见一斑,在Java之前,已经有很多主流语言,像 C#和C++,支持Lambda 函数编程,此次Java引入Lambda支持也算后知后觉了. 想在Java中书写Lambda的代码,有两个

java lambda表达式学习笔记

lambda是函数式编程(FP,functional program),在java8中引入,而C#很早之前就有了.在java中lambda表达式是'->',在C#中是‘=>’. 杜甫说:射人先射马,擒贼先擒王.学习一个库要学习它的入口类.lambda的入口类是Stream,一看Stream中的函数就会发现Function,Predicate等lambda元素. 一.几个概念     函数式接口 Functional Interface,除了static和default类型的方法外,只有一个函数

Java Lambda 表达式的几各形式

一.Lambda表达式的组成主要有传入参数列表(参数1,参数2,...),箭头 ->,及返回值,可以是某个值,或者结构体. 如下举一个小小的例子: 在没有使用Lambda表达式时: import java.util.stream.Stream; /** * * @author Kangjun */ public class Demo { public static void main(String[] args){ Dog dog1 = new Dog("dog1",2); Do

Java Lambda表达式 实现原理分析

https://blog.csdn.net/qq_37960603/article/details/85028867 在类编译时,会生成一个私有静态方法+一个内部类. 在内部类中实现了函数式接口,在实现接口的方法中,会调用编译器生成的静态方法. 在使用lambda表达式的地方,通过传递内部类实例,来调用函数式接口方法. 原文地址:https://www.cnblogs.com/eryun/p/12149841.html

java lambda表达式

ArrayList<String> list = new ArrayList<String>(); list.add(0, "b"); list.add(1, "a"); list.add(0, "c"); list.add(1, "d"); ITopable<String> sortDesc = ((strList) -> { String tmp = null; for (Stri

【Java学习笔记之三十一】详解Java8 lambda表达式

Java 8 发布日期是2014年3月18日,这次开创性的发布在Java社区引发了不少讨论,并让大家感到激动.特性之一便是随同发布的lambda表达式,它将允许我们将行为传到函数里.在Java 8之前,如果想将行为传入函数,仅有的选择就是匿名类,需要6行代码.而定义行为最重要的那行代码,却混在中间不够突出.Lambda表达式取代了匿名类,取消了模板,允许用函数式风格编写代码.这样有时可读性更好,表达更清晰.在Java生态系统中,函数式表达与对面向对象的全面支持是个激动人心的进步.将进一步促进并行