「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。

获取窗口开始时间Flink源代码

获取窗口的开始时间为以下代码:

org.apache.flink.streaming.api.windowing.windows.TimeWindow

/** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

这一段代码,我们可以认为Flink并不是把时间戳直接作为窗口的开始时间,而是做了一些“对齐”操作,确保时间能够整除8。

不同时间类型的窗口时间计算

1、当TimeCharacteristic为ProcessingTime时

窗口的开始时间:与窗口接收到的第一条消息的处理时间有关。例如:window operator是2020-02-06 22:02:33接收到的第一条消息,那么窗口的开始时间就是2020-02-06 22:02:33。

窗口的结束时间:一旦窗口的开始时间确定了,因为窗口的长度是固定的。那么窗口的结束时间就确定下来了,例如:假设这里的时间窗口是3秒,那么窗口的结束时间就是2020-02-06 22:02:36。

窗口的触发计算时间:假设有一条新的消息到达window operator,此时如果对应operator的系统时间,大于结束时间,就会触发计算。

一旦窗口的开始时间确定了,那么后续窗口的开始时间,也就都确定下来了。

问题:

假设某个时间窗口,2020-2-6 22:12:20 - 2020-2-6 22:12:23,之间没有任何一条数据进来。Flink会如何处理?

Flink会直接抛弃掉这个时间窗口,新来的事件消息会到其他的时间窗口中计算。

2、当TimeCharacteristic为IngestionTime时

窗口的开始时间:与source operator接收到的第一条消息有关。例如:source接收到这条消息的时间是2020-2-6 22:14:50,那么窗口的开始时间就是2020-2-6 22:14:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达source operator,那么此时的时间如果大于结束时间,就会触发计算。

除了窗口的开始时间、触发时间都是与source operator算子有关,其他与Processing Time是类似的。

3、但TimeCharacteristic为EventTime时

窗口的开始时间:与window operator接收到的第一条消息的事件时间有关,例如:如果这条消息的水印时间是2020-2-6 22:17:50,那么窗口的的开始时间就是2020-2-6 22:17:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达window operator,如果该事件的水印时间大于窗口的结束时间,就会触发计算。

通常,我们会让水印时间比事件时间允许延迟几秒钟。这样,如果是因为网络延迟消息晚到了几秒,也不会影响到统计结果了。

public class WordCountWindow {
    public static void main(String[] args) throws Exception {
        // 1. 初始化流式运行环境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        // 2. 设置时间处理类型,这里设置的方式处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 3. 定义数据源,每秒发送一个hadoop单词
        SingleOutputStreamOperator<Tuple2<String, Long>> wordDSWithWaterMark = env.addSource(new RichSourceFunction<Tuple2<String, Long>>() {

            private boolean isCanaled = false;
            private int TOTAL_NUM = 20;

            @Override
            public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
                while (!isCanaled) {
                    ctx.collect(Tuple2.of("hadooop", System.currentTimeMillis()));

                    // 打印窗口开始、结束时间
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    System.out.println("事件发送时间:" + sdf.format(System.currentTimeMillis()));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element) {
                return element.f1;
            }
        });

        // 4. 每5秒进行一次,分组统计
        // 4.1 转换为元组
        wordDSWithWaterMark.map(word -> {
            return Tuple2.of(word.f0, 1);

        })
                // 指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照单词进行分组
                .keyBy(t -> t.f0)
                // 滚动窗口,3秒计算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {

                        // 打印窗口开始、结束时间
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("窗口开始时间:" + sdf.format(window.getStart())
                                + " 窗口结束时间:" + sdf.format(window.getEnd())
                                + " 窗口计算时间:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator<Tuple2<String, Integer>> iterator = input.iterator();
                        while(iterator.hasNext()) {
                            Integer count = iterator.next().f1;
                            sum += count;
                        }
                        out.collect(Tuple2.of(word, sum));
                    }
                }).print();

        env.execute("app");
    }
}

输出结果如下:

事件发送时间:2020-02-06 22:35:08

事件发送时间:2020-02-06 22:35:09

事件发送时间:2020-02-06 22:35:10

事件发送时间:2020-02-06 22:35:11

事件发送时间:2020-02-06 22:35:12

事件发送时间:2020-02-06 22:35:13

事件发送时间:2020-02-06 22:35:14

窗口开始时间:2020-02-06 22:35:06 窗口结束时间:2020-02-06 22:35:09 窗口计算时间:2020-02-06 22:35:14

4> (hadooop,1)

事件发送时间:2020-02-06 22:35:15

事件发送时间:2020-02-06 22:35:16

事件发送时间:2020-02-06 22:35:17

窗口开始时间:2020-02-06 22:35:09 窗口结束时间:2020-02-06 22:35:12 窗口计算时间:2020-02-06 22:35:17

4> (hadooop,3)

参考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/event_time.html

原文地址:https://www.cnblogs.com/ilovezihan/p/12271433.html

时间: 2024-08-01 12:29:55

「Flink」事件时间与水印的相关文章

「Flink」Flink中的时间类型

Flink中的时间类型和窗口是非常重要概念,是学习Flink必须要掌握的两个知识点. Flink中的时间类型 时间类型介绍 Flink流式处理中支持不同类型的时间.分为以下几种: 处理时间 Flink程序执行对应操作的系统时间.所有基于时间的操作(例如:时间窗口)都将使用运行相应operator的系统时间.例如:每个小时的处理时间窗口包括在系统时间范围内所有operator接收到的记录.例如:如果应用程序在09:15开始运行,则第一个滚动时间窗口将包括:09:15 – 10:00 之间的处理事件

「Flink」理解流式处理重要概念

什么是流式处理呢? 这个问题其实我们大部分时候是没有考虑过的,大多数,我们是把流式处理和实时计算放在一起来说的.我们先来了解下,什么是数据流. 数据流(事件流) 数据流是无边界数据集的抽象 我们之前接触的数据处理,大多都都是有界的.例如:处理某天的数据.某个季度的数据等 无界意味着数据是无限地.持续增长的 数据流会随着时间的推移,源源不断地加入进来 数据流无处不再 信息卡交易 电商购物 快递 网络交换机的流向数据 设备传感器发出的数据 - 这些数据都是无穷无尽的 每一件事情,都可以看成事件序列

「Flink」使用Managed Keyed State实现计数窗口功能

先上代码: public class WordCountKeyedState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 初始化测试单词数据流 DataStreamSource<String> lineDS = env.addSource(n

事件时间(event time)与水印(watermark)

事件时间和水印诞生的背景 在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响 比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有2秒的延时,也就是在实际时间的第1秒产生的数据有可能在第3秒中产生的数据之后到来. 假设在一个5秒的滚动窗口中,有一个EventTime是 9秒的数据,在第11秒时候到来了. 图示: 那么对于一个Count聚合的Tumble(5s)的window,上面的情况如何处理才能window3=3,window2=3 呢? 时间类型 Fl

Android逆向之旅---静态方式分析破解视频编辑应用「Vue」水印问题

一.故事背景 现在很多人都喜欢玩文艺,特别是我身边的UI们,拍照一分钟修图半小时.就是为了能够在朋友圈显得逼格高,不过的确是挺好看的,修图的软件太多了就不多说了,而且一般都没有水印啥的.相比较短视频有一个比较有逼格的编辑工具「Vue」个人已经用了很长时间了,拍出来的视频借助强大滤镜真的很好看,显得逼格也高,更重要的是他有我最喜欢的功能就是可以添加视频背景音乐,选择自己喜欢的音乐,然后还可以编辑这段背景音乐,反正我个人觉的这个是我最喜欢用的产品了.但是好用的东西必定有它不好的地方,因为他真的很强大

Firemonkey ListView 获取项目右方「>」(Accessory) 事件

适用:XE6 或更高版本 说明:ListView 在基本的项目里提供了 Accessory(项目右方「>」符号),但要如何分辨是否按下>或者项目本身呢?在 XE6 提供了 OnItemClickEx 事件可以辨识,如下例: procedure TForm1.ListView1ItemClickEx(const Sender: TObject; ItemIndex: Integer; const LocalClickPos: TPointF; const ItemObject: TListIte

开放的智力8:实用「成功学」

可实现的「成功学」 现在我想为这里的年轻人介绍一种可实现的「成功学」.希望这个我自创的理论,可以改变很多人的一生. 当我们评价一个事情值不值得去做.应该花多少精力去做的时候,应该抛弃单一的视角,而是分两个不同的维度来看,一是该事件将给我带来的收益大小(认知.情感.物质.身体方面的收益皆可计入),即「收益值」:二是该收益随时间衰减的速度,我称为「收益半衰期」,半衰期长的事件,对我们的影响会持续得较久较长. 这两个维度正交以后就形成了一个四象限图.我们生活.学习和工作中的所有事情都可以放进这个图里面

技术人员应对「考核」的一些思考

来这个公司实习已经半年多了,在年前经历了一次年终考核,最终对我的工作的评级是 C(及格-符合当前职位的工作),让我不禁思考自己在项目中的一些工作的问题,为什么我是C?是我做的不够好吗?或者说在哪里做的不够好? 从考核流程来看,基本上是 CTO 与 Team Leader 对团队成员的「年终总结与次年工作计划」进行Rank,个人狭义的认为「考核」的主要支持材料就是这个总结了. 他山之石 其他公司是怎么考核的呢?说实话我也不太清楚,刚入行,只能通过搜索了解,在网上了解到有以下几种:发精品博客.发论文

微信小程序开发基础(一)「配置」与「逻辑层」

微信小程序作为微信生态重要的一环,在实际生活.工作.商业中的应用越来越广泛.想学习微信小程序开发的朋友也越来越多,本文将在小程序框架的基础上就微信小程序项目开发所必需的基础知识及语法特点进行了详细总结和阐述,包括配置.函数.语法.事件及其处理.数据绑定.模块.样式等.想开发小程序,这些内容是必须掌握的. 全文知识结构预览: 一.程序配置: 1.全局配置:2.页面配置 二.逻辑层: 1.程序注册:App()方法:2.页面注册:Page()方法:3.模块与调用:4.微信原生API 三.视图层(将在单