Demo:基于 Flink SQL 构建流式应用

Flink 1.10.0 于近期刚发布,释放了许多令人激动的新特性。尤其是 Flink SQL 模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用 Flink SQL 如何快速构建流式应用。

本文将基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 构建一个电商用户行为的实时分析应用。本文所有的实战演练都将在 Flink SQL CLI 上执行,全程只涉及 SQL 纯文本,无需一行 Java/Scala 代码,无需安装 IDE。本实战演练的最终效果图:

准备

一台装有 Docker 和 Java8 的 Linux 或 MacOS 计算机。

使用 Docker Compose 启动容器

本实战演示所依赖的组件全都编排到了容器中,因此可以通过 docker-compose 一键启动。你可以通过 wget 命令自动下载该 docker-compose.yml 文件,也可以手动下载。

mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml

该 Docker Compose 中包含的容器有:

  • DataGen: 数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。默认每秒生成 1000 条数据,持续生成约 3 小时。也可以更改 docker-compose.yml 中 datagen 的 speedup 参数来调整生成速率(重启 docker compose 才能生效)。
  • MySQL: 集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
  • Kafka: 主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。
  • Zookeeper: Kafka 容器依赖。
  • Elasticsearch: 主要存储 Flink SQL 产出的数据。
  • Kibana: 可视化 Elasticsearch 中的数据。

在启动容器前,建议修改 Docker 的配置,将资源调整到 4GB 以及 4核。启动所有的容器,只需要在 docker-compose.yml 所在目录下运行如下命令。

docker-compose up -d

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的五个容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

另外可以通过如下命令停止所有的容器:

docker-compose down

下载安装 Flink 本地集群

我们推荐用户手动下载安装 Flink,而不是通过 Docker 自动启动 Flink。因为这样可以更直观地理解 Flink 的各个组件、依赖、和脚本。

  1. 下载 Flink 1.10.0 安装包并解压(解压目录 flink-1.10.0):https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
  2. 进入 flink-1.10.0 目录:cd flink-1.10.0
  3. 通过如下命令下载依赖 jar 包,并拷贝到 lib/ 目录下,也可手动下载和拷贝。因为我们运行时需要依赖各个 connector 实现。
  4. -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
  5. 将 conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因为我们会同时运行多个任务。
  6. 执行 ./bin/start-cluster.sh,启动集群。
    运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。并且可以看到可用 Slots 数为 10 个。

  1. 执行 bin/sql-client.sh embedded 启动 SQL CLI。便会看到如下的松鼠欢迎界面。

使用 DDL 创建 Kafka 表

Datagen 容器在启动后会往 Kafka 的 user_behavior topic 中持续不断地写入数据。数据包含了2017年11月27日一天的用户行为(行为包括点击、购买、加购、喜欢),每一行表示一条用户行为,以 JSON 的格式由用户ID、商品ID、商品类目ID、行为类型和时间组成。该原始数据集来自阿里云天池公开数据集,特此鸣谢。

我们可以在 docker-compose.yml 所在目录下运行如下命令,查看 Kafka 集群中生成的前10条数据。

docker-compose exec kafka bash -c ‘kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10‘

{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
...

有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic 了。在 Flink SQL CLI 中执行该 DDL。

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
    WATERMARK FOR ts as ts - INTERVAL ‘5‘ SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    ‘connector.type‘ = ‘kafka‘,  -- 使用 kafka connector
    ‘connector.version‘ = ‘universal‘,  -- kafka 版本,universal 支持 0.11 以上的版本
    ‘connector.topic‘ = ‘user_behavior‘,  -- kafka topic
    ‘connector.startup-mode‘ = ‘earliest-offset‘,  -- 从起始 offset 开始读取
    ‘connector.properties.zookeeper.connect‘ = ‘localhost:2181‘,  -- zookeeper 地址
    ‘connector.properties.bootstrap.servers‘ = ‘localhost:9092‘,  -- kafka broker 地址
    ‘format.type‘ = ‘json‘  -- 数据源格式为 json
);

如上我们按照数据的格式声明了 5 个字段,除此之外,我们还通过计算列语法和 PROCTIME() 内置函数声明了一个产生处理时间的虚拟列。我们还通过 WATERMARK 语法,在 ts 字段上声明了 watermark 策略(容忍5秒乱序), ts 字段因此也成了事件时间列。关于时间属性以及 DDL 语法可以阅读官方文档了解更多:

在 SQL CLI 中成功创建 Kafka 表后,可以通过 show tables; 和 describe user_behavior; 来查看目前已注册的表,以及表的详细信息。我们也可以直接在 SQL CLI 中运行 SELECT * FROM user_behavior; 预览下数据(按q退出)。

接下来,我们会通过三个实战场景来更深入地了解 Flink SQL 。

统计每小时的成交量

使用 DDL 创建 Elasticsearch 表

我们先在 SQL CLI 中创建一个 ES 结果表,根据场景需求主要需要保存两个数据:小时、成交量。

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    ‘connector.type‘ = ‘elasticsearch‘, -- 使用 elasticsearch connector
    ‘connector.version‘ = ‘6‘,  -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
    ‘connector.hosts‘ = ‘http://localhost:9200‘,  -- elasticsearch 地址
    ‘connector.index‘ = ‘buy_cnt_per_hour‘,  -- elasticsearch 索引名,相当于数据库的表名
    ‘connector.document-type‘ = ‘user_behavior‘, -- elasticsearch 的 type,相当于数据库的库名
    ‘connector.bulk-flush.max-actions‘ = ‘1‘,  -- 每条数据都刷新
    ‘format.type‘ = ‘json‘,  -- 输出数据格式 json
    ‘update-mode‘ = ‘append‘
);

我们不需要在 Elasticsearch 中事先创建 buy_cnt_per_hour 索引,Flink Job 会自动创建该索引。

提交 Query

统计每小时的成交量就是每小时共有多少 "buy" 的用户行为。因此会需要用到 TUMBLE 窗口函数,按照一小时切窗。然后每个窗口分别统计 "buy" 的个数,这可以通过先过滤出 "buy" 的数据,然后 COUNT(*) 实现。

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL ‘1‘ HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = ‘buy‘
GROUP BY TUMBLE(ts, INTERVAL ‘1‘ HOUR);

这里我们使用 HOUR 内置函数,从一个 TIMESTAMP 列中提取出一天中第几个小时的值。使用了 INSERT INTO将 query 的结果持续不断地插入到上文定义的 es 结果表中(可以将 es 结果表理解成 query 的物化视图)。另外可以阅读该文档了解更多关于窗口聚合的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows

在 Flink SQL CLI 中运行上述查询后,在 Flink Web UI 中就能看到提交的任务,该任务是一个流式任务,因此会一直运行。

使用 Kibana 可视化结果

我们已经通过 Docker Compose 启动了 Kibana 容器,可以通过 http://localhost:5601 访问 Kibana。首先我们需要先配置一个 index pattern。点击左侧工具栏的 "Management",就能找到 "Index Patterns"。点击 "Create Index Pattern",然后通过输入完整的索引名 "buy_cnt_per_hour" 创建 index pattern。创建完成后, Kibana 就知道了我们的索引,我们就可以开始探索数据了。

先点击左侧工具栏的"Discovery"按钮,Kibana 就会列出刚刚创建的索引中的内容。

接下来,我们先创建一个 Dashboard 用来展示各个可视化的视图。点击页面左侧的"Dashboard",创建一个名为 ”用户行为日志分析“ 的Dashboard。然后点击 "Create New" 创建一个新的视图,选择 "Area" 面积图,选择 "buy_cnt_per_hour" 索引,按照如下截图中的配置(左侧)画出成交量面积图,并保存为”每小时成交量“。

可以看到凌晨是一天中成交量的低谷。

统计一天每10分钟累计独立用户数

另一个有意思的可视化是统计一天中每一刻的累计独立用户数(uv),也就是每一刻的 uv 数都代表从0点到当前时刻为止的总计 uv 数,因此该曲线肯定是单调递增的。

我们仍然先在 SQL CLI 中创建一个 Elasticsearch 表,用于存储结果汇总数据。主要有两个字段:时间和累积 uv 数。

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    ‘connector.type‘ = ‘elasticsearch‘,
    ‘connector.version‘ = ‘6‘,
    ‘connector.hosts‘ = ‘http://localhost:9200‘,
    ‘connector.index‘ = ‘cumulative_uv‘,
    ‘connector.document-type‘ = ‘user_behavior‘,
    ‘format.type‘ = ‘json‘,
    ‘update-mode‘ = ‘upsert‘
);

为了实现该曲线,我们可以先通过 OVER WINDOW 计算出每条数据的当前分钟,以及当前累计 uv(从0点开始到当前行为止的独立用户数)。 uv 的统计我们通过内置的 COUNT(DISTINCT user_id)来完成,Flink SQL 内部对 COUNT DISTINCT 做了非常多的优化,因此可以放心使用。

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, ‘HH:mm‘),1,4) || ‘0‘) OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

这里我们使用 SUBSTR 和 DATE_FORMAT 还有 || 内置函数,将一个 TIMESTAMP 字段转换成了 10分钟单位的时间字符串,如: 12:1012:20。关于 OVER WINDOW 的更多内容可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations

我们还使用了 CREATE VIEW 语法将 query 注册成了一个逻辑视图,可以方便地在后续查询中对该 query 进行引用,这有利于拆解复杂 query。注意,创建逻辑视图不会触发作业的执行,视图的结果也不会落地,因此使用起来非常轻量,没有额外开销。由于 uv_per_10min 每条输入数据都产生一条输出数据,因此对于存储压力较大。我们可以基于 uv_per_10min 再根据分钟时间进行一次聚合,这样每10分钟只有一个点会存储在 Elasticsearch 中,对于 Elasticsearch 和 Kibana 可视化渲染的压力会小很多。

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

提交上述查询后,在 Kibana 中创建 cumulative_uv 的 index pattern,然后在 Dashboard 中创建一个"Line"折线图,选择 cumulative_uv 索引,按照如下截图中的配置(左侧)画出累计独立用户数曲线,并保存。

顶级类目排行榜

最后一个有意思的可视化是类目排行榜,从而了解哪些类目是支柱类目。不过由于源数据中的类目分类太细(约5000个类目),对于排行榜意义不大,因此我们希望能将其归约到顶级类目。所以笔者在 mysql 容器中预先准备了子类目与顶级类目的映射数据,用作维表。

在 SQL CLI 中创建 MySQL 表,后续用作维表查询。

CREATE TABLE category_dim (
    sub_category_id BIGINT,  -- 子类目
    parent_category_id BIGINT -- 顶级类目
) WITH (
    ‘connector.type‘ = ‘jdbc‘,
    ‘connector.url‘ = ‘jdbc:mysql://localhost:3306/flink‘,
    ‘connector.table‘ = ‘category‘,
    ‘connector.driver‘ = ‘com.mysql.jdbc.Driver‘,
    ‘connector.username‘ = ‘root‘,
    ‘connector.password‘ = ‘123456‘,
    ‘connector.lookup.cache.max-rows‘ = ‘5000‘,
    ‘connector.lookup.cache.ttl‘ = ‘10min‘
);

同时我们再创建一个 Elasticsearch 表,用于存储类目统计结果。

CREATE TABLE top_category (
    category_name STRING,  -- 类目名称
    buy_cnt BIGINT  -- 销量
) WITH (
    ‘connector.type‘ = ‘elasticsearch‘,
    ‘connector.version‘ = ‘6‘,
    ‘connector.hosts‘ = ‘http://localhost:9200‘,
    ‘connector.index‘ = ‘top_category‘,
    ‘connector.document-type‘ = ‘user_behavior‘,
    ‘format.type‘ = ‘json‘,
    ‘update-mode‘ = ‘upsert‘
);

第一步我们通过维表关联,补全类目名称。我们仍然使用 CREATE VIEW 将该查询注册成一个视图,简化逻辑。维表关联使用 temporal join 语法,可以查看文档了解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior,
  CASE C.parent_category_id
    WHEN 1 THEN ‘服饰鞋包‘
    WHEN 2 THEN ‘家装家饰‘
    WHEN 3 THEN ‘家电‘
    WHEN 4 THEN ‘美妆‘
    WHEN 5 THEN ‘母婴‘
    WHEN 6 THEN ‘3C数码‘
    WHEN 7 THEN ‘运动户外‘
    WHEN 8 THEN ‘食品‘
    ELSE ‘其他‘
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

最后根据 类目名称分组,统计出 buy 的事件数,并写入 Elasticsearch 中。

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = ‘buy‘
GROUP BY category_name;

提交上述查询后,在 Kibana 中创建 top_category 的 index pattern,然后在 Dashboard 中创建一个"Horizontal Bar"条形图,选择 top_category 索引,按照如下截图中的配置(左侧)画出类目排行榜,并保存。

可以看到服饰鞋包的成交量远远领先其他类目。

到目前为止,我们已经完成了三个实战案例及其可视化视图。现在可以回到 Dashboard 页面,对各个视图进行拖拽编排,让我们的 Dashboard 看上去更加正式、直观(如本文开篇效果图)。当然,Kibana 还提供了非常丰富的图形和可视化选项,而用户行为数据中也有很多有意思的信息值得挖掘,感兴趣的读者可以用 Flink SQL 对数据进行更多维度的分析,并使用 Kibana 展示更多可视化图,并观测图形数据的实时变化。

结尾

在本文中,我们展示了如何使用 Flink SQL 集成 Kafka, MySQL, Elasticsearch 以及 Kibana 来快速搭建一个实时分析应用。整个过程无需一行 Java/Scala 代码,使用 SQL 纯文本即可完成。期望通过本文,可以让读者了解到 Flink SQL 的易用和强大,包括轻松连接各种外部系统、对事件时间和乱序数据处理的原生支持、维表关联、丰富的内置函数等等。希望你能喜欢我们的实战演练,并从中获得乐趣和知识!

查看更多:https://yqh.aliyun.com/detail/6404?utm_content=g_1000105579

上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/

原文地址:https://www.cnblogs.com/yunqishequ/p/12377370.html

时间: 2024-10-07 10:46:45

Demo:基于 Flink SQL 构建流式应用的相关文章

「Flink」理解流式处理重要概念

什么是流式处理呢? 这个问题其实我们大部分时候是没有考虑过的,大多数,我们是把流式处理和实时计算放在一起来说的.我们先来了解下,什么是数据流. 数据流(事件流) 数据流是无边界数据集的抽象 我们之前接触的数据处理,大多都都是有界的.例如:处理某天的数据.某个季度的数据等 无界意味着数据是无限地.持续增长的 数据流会随着时间的推移,源源不断地加入进来 数据流无处不再 信息卡交易 电商购物 快递 网络交换机的流向数据 设备传感器发出的数据 - 这些数据都是无穷无尽的 每一件事情,都可以看成事件序列

构建流式应用—RxJS详解

讲之前先说一点哈,插入的图片,不能正常显示,图片一半被遮盖了,如果想看,鼠标右击然后图片地址,图片另存为去看.如果有知道怎么解决的,可以在下面给我评论,我会及时的修改. 好啦,开始: 目录 常规方式实现搜索功能 RxJS · 流 Stream RxJS 实现原理简析 观察者模式 迭代器模式 RxJS 的观察者 + 迭代器模式 RxJS 基础实现 Observable Observer RxJS · Operators Operators ·入门 一系列的 Operators 操作 使用 RxJS

Flink系列之流式

本文仅是自己看书.学习过程中的个人总结,刚接触流式,视野面比较窄,不喜勿喷,欢迎评论交流. 1.为什么是流式? 为什么是流式而不是流式系统这样的词语?流式系统在我的印象中是相对批处理系统而言的,用来处理流数据,实现数据处理功能的一个系统,而流式一词提醒我要以数据产生的方式去看待数据和以及处理过程,即在现实生活中,数据是以流的形式不断产生的,处理的过程应贴近数据产生的方式. 2.流与批 在处理数据时,对数据而言有:无界和有界之分.无界可以理解为不知道数据产生的停止时间,在数学上可以用前闭后开( [

翻译-In-Stream Big Data Processing 流式大数据处理

相当长一段时间以来,大数据社区已经普遍认识到了批量数据处理的不足.很多应用都对实时查询和流式处理产生了迫切需求.最近几年,在这个理念的推动下,催生出了一系列解决方案,Twitter Storm,Yahoo S4,Cloudera Impala,Apache Spark和Apache Tez纷纷加入大数据和NoSQL阵营.本文尝试探讨流式处理系统用到的技术,分析它们与大规模批量处理和OLTP/OLAP数据库的关系,并探索一个统一的查询引擎如何才能同时支持流式.批量和OLAP处理. 在Grid Dy

流式数据中的数学统计量计算

在科技飞速发展的今天,每天都会产生大量新数据,例如银行交易记录,卫星飞行记录,网页点击信息,用户日志等.为了充分利用这些数据,我们需要对数据进行分析.在数据分析领域,很重要的一块内容是流式数据分析.流式数据,也即数据是实时到达的,无法一次性获得所有数据.通常情况下我们需要对其进行分批处理或者以滑动窗口的形式进行处理.分批处理也即每次处理的数据之间没有交集,此时需要考虑的问题是吞吐量和批处理的大小.滑动窗口计算表示处理的数据每次向前移N个单位,N小于要处理数据的长度.例如,在语音识别中,每个包处理

Calcite中的流式SQL

Calcite中的流式SQL Calcite中的流式SQL总体设计思路 总体语法应该兼容SQL,这个是和目前流处理SQL的发展趋势是一致的. 如果部分功能标准SQL中没有包含,则尽量采用业界标杆(Oracle).比如模式匹配的功能,目前流处理中还没有针对语法达成共识,那么在设计上,就采用Oracle data warehouse的Match Recognize的方式.还有滑窗功能. 如果还有功能目前业界标杆都没有,那么就通过函数的方式拓展,翻滚窗口和跳动窗口,这两个窗口在标准SQL中都是不包含的

“流式”前端构建工具——gulp.js 简介

Grunt 一直是前端领域构建工具(任务运行器或许更准确一些,因为前端构建只是此类工具的一部分用途)的王者,然而它也不是毫无缺陷的,近期风头正劲的 gulp.js 隐隐有取而代之的态势.那么,究竟是什么使得 gulp.js 备受关注呢? Grunt 之殇 gulp.js 的作者 Eric Schoffstall 在他介绍 gulp.js 的 presentation 中总结了 Grunt 的几点不足之处: 插件很难遵守单一责任原则.因为 Grunt 的 API 设计缺憾,使得许多插件不得不负责一

流式计算(五)-Flink核心概念

一手资料,完全来自官网,直接参考英文过来的,并加了一些自己的理解,希望能让看官君了解点什么,足矣. 环境:Flink1.9.1 难度:新手--战士--老兵--大师 目标: 理解Flink的计算模型 认识各重要组件 说明: 本篇作为前两篇的补充内容,算是理论篇 步骤: 01-Flink编程模型 Flink的流计算整体来看都是按照Source -> Transformation -> Sink三步走,即获取流源 -> 进行转换 -> 汇聚(Sink),但“转换 (Transformat

如何构建一个flink sql平台

我们都知道,离线计算有Hive,使用过的知道,需要先定义一个schema,比如针对HDFS这种存储对标mysql定义一个schema,schema的本质是什么?主要描述下面这些信息 1)当前存储的物理位置的描述 2)数据格式的组成形式 然后Hive可以让用户定义一段sql,针对上面定义的schema进行,sql的本质是什么,是业务逻辑的描述.然后Hive内部会将这段sql进行编译转化为原生的底层MapReduce操作,通过这种方式,屏蔽底层技术原理,让业务开发人员集中精力在schema和sql业