「Flink」使用Managed Keyed State实现计数窗口功能

先上代码:

public class WordCountKeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 初始化测试单词数据流
        DataStreamSource<String> lineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while(!isCanaled) {
                    ctx.collect("hadoop flink spark");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 切割单词,并转换为元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = lineDS.flatMap((String line, Collector<Tuple2<String, Integer>> ctx) -> {
            Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, Integer> keyedWordTupleDS = wordTupleDS.keyBy(t -> t.f1);

        // 对单词进行计数
        keyedWordTupleDS.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ValueState<Tuple2<Integer, Integer>> countSumValueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化ValueState
                ValueStateDescriptor<Tuple2<Integer, Integer>> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",
                        TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
                );
                countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);
            }

            @Override
            public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
                if(countSumValueState.value() == null) {
                    countSumValueState.update(Tuple2.of(0, 0));
                }

                Integer count = countSumValueState.value().f0;
                count++;
                Integer valueSum = countSumValueState.value().f1;
                valueSum += value.f1;

                countSumValueState.update(Tuple2.of(count, valueSum));

                // 每当达到3次,发送到下游
                if(count > 3) {
                    out.collect(Tuple2.of(value.f0, valueSum));
                    // 清除计数
                    countSumValueState.update(Tuple2.of(0, valueSum));
                }
            }
        }).print();

        env.execute("KeyedState State");
    }
}

代码说明:

1、构建测试数据源,每秒钟发送一次文本,为了测试方便,这里就发一个包含三个单词的文本行

2、对句子按照空格切分,并将单词转换为元组,每个单词初始出现的次数为1

3、按照单词进行分组

4、自定义FlatMap

初始化ValueState,注意:ValueState只能在KeyedStream中使用,而且每一个ValueState都对一个一个key。每当一个并发处理ValueState,都会从上下文获取到Key的取值,所以每个处理逻辑拿到的ValueStated都是对应指定key的ValueState,这个部分是由Flink自动完成的。

注意:

带默认初始值的ValueStateDescriptor已经过期了,官方推荐让我们手动在处理时检查是否为空

instead and manually manage the default value by checking whether the contents of the state is null.

/** * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific * serializer. * * @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually * manage the default value by checking whether the contents of the state is {@code null}. * * @param name The (unique) name for the state. * @param typeSerializer The type serializer of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting *                     a value before. */@Deprecatedpublic ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) {   super(name, typeSerializer, defaultValue);}

5、逻辑实现

在flatMap逻辑中判断ValueState是否已经初始化,如果没有手动给一个初始值。并进行累加后更新。每当count > 3发送计算结果到下游,并清空计数。

原文地址:https://www.cnblogs.com/ilovezihan/p/12247368.html

时间: 2024-08-01 12:30:03

「Flink」使用Managed Keyed State实现计数窗口功能的相关文章

「Flink」理解流式处理重要概念

什么是流式处理呢? 这个问题其实我们大部分时候是没有考虑过的,大多数,我们是把流式处理和实时计算放在一起来说的.我们先来了解下,什么是数据流. 数据流(事件流) 数据流是无边界数据集的抽象 我们之前接触的数据处理,大多都都是有界的.例如:处理某天的数据.某个季度的数据等 无界意味着数据是无限地.持续增长的 数据流会随着时间的推移,源源不断地加入进来 数据流无处不再 信息卡交易 电商购物 快递 网络交换机的流向数据 设备传感器发出的数据 - 这些数据都是无穷无尽的 每一件事情,都可以看成事件序列

「Flink」Flink中的时间类型

Flink中的时间类型和窗口是非常重要概念,是学习Flink必须要掌握的两个知识点. Flink中的时间类型 时间类型介绍 Flink流式处理中支持不同类型的时间.分为以下几种: 处理时间 Flink程序执行对应操作的系统时间.所有基于时间的操作(例如:时间窗口)都将使用运行相应operator的系统时间.例如:每个小时的处理时间窗口包括在系统时间范围内所有operator接收到的记录.例如:如果应用程序在09:15开始运行,则第一个滚动时间窗口将包括:09:15 – 10:00 之间的处理事件

「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系. 获取窗口开始时间Flink源代码 获取窗口的开始时间为以下代码: org.apache.flink.streaming.api.windowing.windows.TimeWindow /** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window

「JSOI2019」神经网络(容斥+组合计数+背包dp)

Address luogu5333 loj3102 Solution 容易发现,一条哈密顿回路本质上就是:把每棵树都拆成若干条有向路径,再把所有的有向路径连接成环,环上的相邻两条有向路径不可以来自同一棵树. 先求出 \(g_{i,j}\) 表示把第 \(i\) 棵树拆成 \(j\) 条有向路径的方案数. 考虑 \(\text{dp}\),记 \(f_{u,i,0/1/2/3}\) 分别表示:\(u\) 的子树拆成 \(i\) 条路径,\(u\) 是路径起点,是路径终点,单点成路径,既不是路径起点

「一」创建一个带 ssh 服务的基础镜像(修订版)--使用「docker commit」创建

在介绍如何创建带 ssh 服务的基础镜像之前,我们想回顾一下之前介绍过的内容,其中提到有三种创建镜像的常用办法: 从文件系统导入 从现有容器使用「docker commit」提交 使用 dockerfile 文件 build 本章将主要介绍后面 2 种方法. 步骤如下: $ sudo docker run -ti ubuntu:14.04 /bin/bash #首先,使用我们最熟悉的 「-ti」参数来创建一个容器. [email protected]:/# sshd bash: sshd: co

零元学Expression Blend 4 - Chapter 15 用实例了解互动控制项「Button」I

原文:零元学Expression Blend 4 - Chapter 15 用实例了解互动控制项「Button」I 本章将教大家如何更改Button的预设Template,以及如何在Button内设置动画. ? 本章将教大家如何更改Button的预设Template,以及如何在Button内设置动画. ? ? ? 01 开启一个新专案,并且置入一个Button,调整到适当大小 ? 在Properties->可以调整Button的外观,基本设定都跟先前的教学雷同 不熟的人请看如何用Blend制作一

LibreOJ2044 - 「CQOI2016」手机号码

Portal Description 给出两个十一位数\(L,R\),求\([L,R]\)内所有满足以下两个条件的数的个数. 出现至少\(3\)个相邻的相同数字: 不能同时出现\(4\)和\(8\). Solution 数位DP. 首先将问题转换成\(solve(R)-solve(L)\)的形式,这样只需要求不超过\(n\)的满足条件的数的个数. 定义\(dp[k][x][f_1][f_2][f_3][f_4]\),其中\(k\)表示位数,\(x\)表示尾数,\(f_1\)表示第\(k\)位与第

过去这几十年,分布式系统的「数据一致性」精华都在这了!

阅读目录 为什么需要事务 事务的来源 分布式系统中的事务问题 分布式事务的解决方案 结语 暂时还未涉及的园友们,可以收藏防身哦~ 本文是本系列的第三篇.与前两篇<不知道是不是最通俗易懂的<数据一致性>剖析了>.<烦人的数据不一致到底怎么解决?——通过“共识”达成数据一致性>形成完整的「数据一致性」合集. 一.为什么需要事务 如果说「共识」解决的是「水平」问题,那么「事务」解决的是「垂直」问题.是如何让一条绳上的蚂蚱共同起舞? 事务只是一个计算机术语,而事务的体现形式其实

AC日记——「HNOI2017」单旋 LiBreOJ 2018

#2018. 「HNOI2017」单旋 思路: set+线段树: 代码: #include <bits/stdc++.h> using namespace std; #define maxn 100005 #define maxtree maxn<<2 int val[maxtree],tag[maxtree],L[maxtree],R[maxtree],mid[maxtree]; int op[maxn],ki[maxn],bi[maxn],cnt,size,n,ch[maxn]