《从0到1学习Flink》—— Flink Data transformation(转换)

前言

在第一篇介绍 Flink 的文章 《《从0到1学习Flink》—— Apache Flink 介绍》 中就说过 Flink 程序的结构

Flink 应用程序结构就是如上图所示:

1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。

3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 Sink。

在上四篇文章介绍了 Source 和 Sink:

1、《从0到1学习Flink》—— Data Source 介绍

2、《从0到1学习Flink》—— 如何自定义 Data Source ?

3、《从0到1学习Flink》—— Data Sink 介绍

4、《从0到1学习Flink》—— 如何自定义 Data Sink ?

那么这篇文章我们就来看下 Flink Data Transformation 吧,数据转换操作还是蛮多的,需要好好讲讲!

Transformation

Map

这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流:

还是拿上一篇文章的案例来将数据进行 map 转换操作:

SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
    @Override
    public Student map(Student value) throws Exception {
        Student s1 = new Student();
        s1.id = value.id;
        s1.name = value.name;
        s1.password = value.password;
        s1.age = value.age + 5;
        return s1;
    }
});
map.print();

将每个人的年龄都增加 5 岁,其他不变。

FlatMap

FlatMap 采用一条记录并输出零个,一个或多个记录。

SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
    @Override
    public void flatMap(Student value, Collector<Student> out) throws Exception {
        if (value.id % 2 == 0) {
            out.collect(value);
        }
    }
});
flatMap.print();

这里将 id 为偶数的聚集出来。

Filter

Filter 函数根据条件判断出结果。

SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
    @Override
    public boolean filter(Student value) throws Exception {
        if (value.id > 95) {
            return true;
        }
        return false;
    }
});
filter.print();

这里将 id 大于 95 的过滤出来,然后打印出来。

KeyBy

KeyBy 在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。

KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
});
keyBy.print();

上面对 student 的 age 做 KeyBy 操作分区

Reduce

Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。

SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
}).reduce(new ReduceFunction<Student>() {
    @Override
    public Student reduce(Student value1, Student value2) throws Exception {
        Student student1 = new Student();
        student1.name = value1.name + value2.name;
        student1.id = (value1.id + value2.id) / 2;
        student1.password = value1.password + value2.password;
        student1.age = (value1.age + value2.age) / 2;
        return student1;
    }
});
reduce.print();

上面先将数据流进行 keyby 操作,因为执行 reduce 操作只能是 KeyedStream,然后将 student 对象的 age 做了一个求平均值的操作。

Fold

Fold 通过将最后一个文件夹流与当前记录组合来推出 KeyedStream。 它会发回数据流。

KeyedStream.fold("1", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String accumulator, Integer value) throws Exception {
        return accumulator + "=" + value;
    }
})

Aggregations

DataStream API 支持各种聚合,例如 min,max,sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

KeyedStream.sum(0)
KeyedStream.sum("key")
KeyedStream.min(0)
KeyedStream.min("key")
KeyedStream.max(0)
KeyedStream.max("key")
KeyedStream.minBy(0)
KeyedStream.minBy("key")
KeyedStream.maxBy(0)
KeyedStream.maxBy("key")

max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。

Window

Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 10 秒的时间窗口聚合:

inputStream.keyBy(0).window(Time.seconds(10));

Flink 定义数据片段以便(可能)处理无限数据流。 这些切片称为窗口。 此切片有助于通过应用转换处理数据块。 要对流进行窗口化,我们需要分配一个可以进行分发的键和一个描述要对窗口化流执行哪些转换的函数

要将流切片到窗口,我们可以使用 Flink 自带的窗口分配器。 我们有选项,如 tumbling windows, sliding windows, global 和 session windows。 Flink 还允许您通过扩展 WindowAssginer 类来编写自定义窗口分配器。 这里先预留下篇文章来讲解这些不同的 windows 是如何工作的。

WindowAll

windowAll 函数允许对常规数据流进行分组。 通常,这是非并行数据转换,因为它在非分区数据流上运行。

与常规数据流功能类似,我们也有窗口数据流功能。 唯一的区别是它们处理窗口数据流。 所以窗口缩小就像 Reduce 函数一样,Window fold 就像 Fold 函数一样,并且还有聚合。

inputStream.keyBy(0).windowAll(Time.seconds(10));

Union

Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次。

inputStream.union(inputStream1, inputStream2, ...);

Window join

我们可以通过一些 key 将同一个 window 的两个数据流 join 起来。

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))
           .apply (new JoinFunction () {...});

以上示例是在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。

Split

此功能根据条件将流拆分为两个或多个流。 当您获得混合流并且您可能希望单独处理每个数据流时,可以使用此方法。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select

此功能允许您从拆分流中选择特定流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Project

Project 函数允许您从事件流中选择属性子集,并仅将所选元素发送到下一个处理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...]
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函数从给定记录中选择属性号 2 和 3。 以下是示例输入和输出记录:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)

最后

本文主要介绍了 Flink Data 的常用转换方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。并用了点简单的 demo 介绍了如何使用,具体在项目中该如何将数据流转换成我们想要的格式,还需要根据实际情况对待。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

原文地址:https://www.cnblogs.com/zhisheng/p/10326617.html

时间: 2024-10-03 22:17:34

《从0到1学习Flink》—— Flink Data transformation(转换)的相关文章

《从0到1学习Flink》—— Flink 项目如何运行?

前言 之前写了不少 Flink 文章了,也有不少 demo,但是文章写的时候都是在本地直接运行 Main 类的 main 方法,其实 Flink 是支持在 UI 上上传 Flink Job 的 jar 包,然后运行得.最开始在第一篇 <从0到1学习Flink>-- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 中其实提到过了 Flink 自带的 UI 界面,今天我们就来看看如何将我们的项目打包在这里发布运行. 准备 编译打包 项目代码就拿我之前的文章 <从0到1学习

《从0到1学习Flink》—— Flink 中几种 Time 详解

前言 Flink 在流程序中支持不同的 Time 概念,就比如有 Processing Time.Event Time 和 Ingestion Time. 下面我们一起来看看这几个 Time: Processing Time Processing Time 是指事件被处理时机器的系统时间. 当流程序在 Processing Time 上运行时,所有基于时间的操作(如时间窗口)将使用当时机器的系统时间.每小时 Processing Time 窗口将包括在系统时钟指示整个小时之间到达特定操作的所有事

Flink 从0到1学习 —— Flink 中如何管理配置?

前言 如果你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据.但是在某些情况下,我们需要将配置数据发送到 Flink 集群并从中接收一些额外的数据. 在本文的第一部分中,我将描述如何将配置数据发送到 Flink 集群.我们需要配置很多东西:方法参数.配置文件.机器学习模型.Flink 提供了几种不同的方法,我们将介绍如何使用它们以及何时使用它们.在本文的第二部分中,我将描述如何从 Flink 集群中获取数据. 如何发送数据给 Ta

大数据计算引擎之Flink Flink状态管理和容错

原文地址:大数据计算引擎之Flink Flink状态管理和容错 有状态计算 在Flink架构体系中,有状态计算可以说是Flink非常重要的特征之一.有状态计算是指在程序计算过程中,在Flink程序内部,存储计算产生的中间结果,并提供给Functions 或 孙子计算结果使用.如图所示: 状态数据可以维系在本地存储中,这里的存储可以是 Flink 的堆内存或者堆外内存,也可以借助第三方的存储介质,例如:Flink中已经实现的RocksDB,当然用户也可以自己实现相应的缓存系统去存储状态信息,以完成

Python pandas 0.19.1 Indexing and Selecting Data文档翻译

最近在写个性化推荐的论文,经常用到Python来处理数据,被pandas和numpy中的数据选取和索引问题绕的比较迷糊,索性把这篇官方文档翻译出来,方便自查和学习,翻译过程中难免很多不到位的地方,但大致能看懂,错误之处欢迎指正~ Python pandas 0.19.1 Indexing and Selecting Data 原文链接 http://pandas.pydata.org/pandas-docs/stable/indexing.html 数据索引和选取 pandas对象中的轴标签信息

Retrofit2.0通俗易懂的学习姿势,Retrofit2.0 + OkHttp3 + Gson + RxJava

Retrofit2.0通俗易懂的学习姿势,Retrofit2.0 + OkHttp3 + Gson + RxJava Retrofit,因为其简单与出色的性能,也是受到很多人的青睐,但是他和以往的通信框架还是有点区别,不过放心,因为他本身还是挺简单的,所有我相信你看完这篇文章,对基本的请求是没什么问题的,其实现在网上这样的文章也有很多了,好了,那我们直接开车吧! 一.相关资料 Github:https://github.com/square/retrofit 官网文档:http://square

新手入门指导:Vue 2.0 的建议学习顺序

起步 1. 扎实的 JavaScript / HTML / CSS 基本功.这是前置条件. 2. 通读官方教程 (guide) 的基础篇.不要用任何构建工具,就只用最简单的 <script>,把教程里的例子模仿一遍,理解用法.不推荐上来就直接用 vue-cli 构建项目,尤其是如果没有 Node/Webpack 基础. 3. 照着官网上的示例,自己想一些类似的例子,模仿着实现来练手,加深理解. 4. 阅读官方教程进阶篇的前半部分,到『自定义指令 (Custom Directive) 』为止.着

2017-2-15从0开始前端学习笔记-文本

2017-2-15从0开始前端学习笔记-文本 标签 文本 粗体和斜体 <b>bold粗体</b> <i>italic斜体</i> 上标和下标 <sup>上标</sup> <sub>下标</sub> 空白 换行符和水品线 <br/>换行符 <hr/>水平线 语义化标记 加粗和强调 <strong>加粗 加强语气</strong> <em>强调 斜体 能改

dict((v[0], Student(*v))for v in data)

1 # -*- coding: utf-8 -*- 2 data = [[12202710, 'linbin', 23], [12202711, 'yanyu', 24], 3 [12202712, 'qzc', 22], [12202715, 'sxy', 23]] 4 class Student(object): 5 def __init__(self, num, name, age): 6 self.num = num 7 self.name = name 8 self.age = age