事件时间(event time)与水印(watermark)

  1. 事件时间和水印诞生的背景

    • 在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响
    • 比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有2秒的延时,也就是在实际时间的第1秒产生的数据有可能在第3秒中产生的数据之后到来。
    • 假设在一个5秒的滚动窗口中,有一个EventTime是 9秒的数据,在第11秒时候到来了。
    • 图示:

      • 那么对于一个Count聚合的Tumble(5s)的window,上面的情况如何处理才能window3=3,window2=3 呢?
  2. 时间类型

    • Flink支持不同的时间概念

    • Processing Time(处理时间)
      • 处理时间是指当前机器处理该条事件的时间。
      • 它是当数据流入到具体某个算子时候相应的系统。
      • 他提供了最小的延时和最佳的性能。
      • 但是在分布式和异步环境中, 处理时间不能提供确定性。
      • 因为其对时间到达 系统的速度和数据流在系统的各个operator 之间处理的速度很铭感。
    • Event Time(事件时间)
      • 事件时间是每个事件在其生产设备上发生的时间。
      • 此时间通常在进入Flink之前嵌入到记录中,并且可以从每个记录中提取该事件时间戳。
      • 事件时间对于乱序、延时、或者数据重放等情况,都能给出正确的结果。
      • 事件时间依赖于事件本身,而跟物理时钟没有关系。
      • 基于事件时间的程序必须指定如何生成事件时间水印(watermark),这是指示事件时间进度的机制。
      • 事件时间处理通常存在一定的延时,因此需要为延时和无序的事件等待一段时间。
      • 因此,使用事件时间编程通常需要与处理时间相结合。
    • Ingestion Time(摄入时间)
      • 摄入时间是数据进入Flink框架的时间,是在Source Operator中设置的
      • 与ProcessingTime相比可以提供更可预测的结果,因为摄入时间的时间戳比较稳定(在源处只记录一次)
      • 同一数据在流经不同窗口操作时将使用相同的时间戳
      • 而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳
    • Process time 与 Event time对比:

      • 如上图所示,在一个乱序的数据流里,使用event time类型的事件时间,可以保证数据流的顺序性。
    • 设置时间特行
      • Flink程序的第一部分工作通常是设置时间特性,该设置用于定义数据源使用什么时间,在时间窗口处理中使用什么时间。
      • 代码:
        // 设置执行环境, 类似spark中初始化SparkContext
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
                env.setParallelism(1);
        
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
                // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        
                // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  3. Watermark (水印)

    • WaterMark 产生背景

      • 流处理从事件产生,到数据流经source,再到operator,中间是有一个过程和时间的。
      • 虽然大部分情况下,数据流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
      • 但是对于late element(延迟数据),我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。
      • 这个特别的机制,就是watermark。
    • WaterMark 介绍
      • Watermark是Flink为了处理EventTime时间类型的窗口计算提出的一种机制, 本质上也是一种时间戳。
      • Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
      • 当operator通过基于Event Time的时间窗口来处理数据时,它必须在确定所有属于该时间窗口的消息全部流入此操作符后,才能开始处理数据。
      • 但是由于消息可能是乱序的,所以operator无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。
      • WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入
      • Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的WaterMark,插入到消息流中输出到Flink流处理系统中,Flink operator算子按照时间窗口缓存所有流入的消息。
      • 当操作符处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口的数据进行处理并发送到下一个操作符节点,然后也将WaterMark发送到下一个操作符节点。

    • WaterMark 的产生方式
      • Punctuated

        • 数据流中每一个递增的EventTime都会产生一个Watermark。
        • 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
      • Periodic
        • 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。
        • 在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
    • 代码:
      package com.ronnie.flink.stream.test;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
      import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
      import org.apache.flink.streaming.api.watermark.Watermark;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
      import org.apache.flink.util.Collector;
      
      import javax.annotation.Nullable;
      import java.text.ParseException;
      import java.text.SimpleDateFormat;
      
      /**
       *
       hello,2019-09-17 11:34:05.890
       hello,2019-09-17 11:34:07.890
       hello,2019-09-17 11:34:13.890
       hello,2019-09-17 11:34:08.890
       hello,2019-09-17 11:34:16.890
       hello,2019-09-17 11:34:19.890
       hello,2019-09-17 11:34:21.890
       */
      public class WaterMarkTest {
          public static void main(String[] args) {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
              env.setParallelism(1);
      
           // 设置多久查看一下当前的水位线... 默认200ms
              env.getConfig().setAutoWatermarkInterval(10000);
      
              System.err.println("interval : " + env.getConfig().getAutoWatermarkInterval());
      
              DataStreamSource<String> streamSource = env.socketTextStream("ronnie01", 9999);
      
              SingleOutputStreamOperator<String> watermarks = streamSource.assignTimestampsAndWatermarks(new MyWaterMark());
      
              watermarks.map(new MapFunction<String, Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> map(String value) throws Exception {
                      String[] split = value.split(",");
                      String key = split[0];
      
                      return new Tuple2<String, Integer>(key, 1);
                  }
              }).keyBy(0)
                      .timeWindow(Time.seconds(10))
                       // 自定义的一个计算规则......
                      .apply(new MyWindowFunction())
                      .printToErr();
      
              try {
                  env.execute();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      class MyWaterMark implements AssignerWithPeriodicWatermarks<String>{
      
          // 目前系统里所有数据的最大事件时间
          long currentMaxTimeStamp = 0;
          // 允许数据延迟5s
          long maxLateTime = 5000;
      
          Watermark wm = null;
      
          SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      
          @Nullable
          @Override
          // 周期性地获取目前的水位线时间, 默认200ms
          public Watermark getCurrentWatermark() {
              // 未处理的延迟/乱序问题
             // wm = new Watermark(currentMaxTimeStamp);
      
              // 处理数据的延迟/乱序问题
              wm = new Watermark(currentMaxTimeStamp - maxLateTime);
              System.out.println(format.format(System.currentTimeMillis()) + " 获取当前水位线: " + wm + ","+ format.format(wm.getTimestamp()));
              return wm;
          }
      
          @Override
          public long extractTimestamp(String element, long previousElementTimestamp) {
              String[] split = element.split(",");
      
              String key = split[0];
      
              long timestamp = 0;
      
              try {
                  //将2019-09-17 10:24:50.958 格式时间转成时间戳
                  timestamp = format.parse(split[1]).getTime();
              } catch (ParseException e) {
                  e.printStackTrace();
              }
      
              // 对比新数据的时间戳和目前最大的时间戳, 取大的值作为新的时间戳
              currentMaxTimeStamp= Math.max(timestamp, currentMaxTimeStamp);
      
              System.err.println(key +", 本条数据的时间戳: "+ timestamp + "," +format.format(timestamp)
                      + "|目前数据中的最大时间戳: "+  currentMaxTimeStamp + ","+ format.format(currentMaxTimeStamp)
                      + "|水位线时间戳: "+ wm + ","+ format.format(wm.getTimestamp()));
      
              return timestamp;
          }
      }
      
      class MyWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>{
      
          @Override
          public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
              SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      
              int sum = 0;
      
              for (Tuple2<String, Integer> tuple2:input){
               sum += tuple2.f1;
              }
              long start = window.getStart();
              long end = window.getEnd();
      
              out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
                      + format.format(start) + "  window_end :" + format.format(end)
              );
          }
      }
      

原文地址:https://www.cnblogs.com/ronnieyuan/p/11848725.html

时间: 2024-11-02 15:52:58

事件时间(event time)与水印(watermark)的相关文章

「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系. 获取窗口开始时间Flink源代码 获取窗口的开始时间为以下代码: org.apache.flink.streaming.api.windowing.windows.TimeWindow /** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window

[DOM Event Learning] Section 2 概念梳理 什么是事件 DOM Event

[DOM Event Learning] Section 2 概念梳理 什么是事件 DOM Event 事件 事件(Event)是用来通知代码,一些有趣的事情发生了. 每一个Event都会被一个Event对象所表示,这个对象可能还会有一些自定义的字段或者方法,来获取发生什么事情的更多信息. Event对象实现了Event接口(https://developer.mozilla.org/en-US/docs/Web/API/Event). 事件可以是任何事情,从最基本的用户交互,到renderin

事件(event),正则

1.事件(event):事件是可以被 JavaScript 侦测到的行为.网页中的每个元素都可以产生某些可以触发 JavaScript 函数的事件.2.事件源: 触发事件的元素 事件: 被 JavaScript 侦测到的行为3.事件分类 a.鼠标事件 事件句柄 触发时机onclick 鼠标点击操作ondblclick 鼠标双击操作onmousedowm 按下鼠标按键onmouseup 抬起鼠标按键onmousemove 鼠标指针在元素上方移动onmouseover 鼠标指针进入元素onmouse

TI-RTOS 之 事件同步(Event, 类似semaphore)

TI-RTOS 之 事件同步(Event, 类似semaphore) Event 是类似Semaphore的存在,官方如下描述: SYS/BIOS events are a means of communication between Tasks and other threads such as Hwis, Swis, and other Tasks, or between Tasks and other SYS/BIOS objects. Other SYS/BIOS objects inc

node.js事件循环 event loop

Nodejs事件循环 (event loop) node.js 事件循环的概念 当node.js 启动的时候会初始化eventloop ,每一个evnet loop 都会包含如下6个循环阶段,node.js 事件循环和浏览器事件循环完全不一样. 官网文档:https://nodejs.org/zh-cn/docs/guides/event-loop-timers-and-nexttick/ timers pending callbacks (I/O callbakcs) idle, prepar

jacascript 事件对象event

前言:这是笔者学习之后自己的理解与整理.如果有错误或者疑问的地方,请大家指正,我会持续更新! 在触发DOM上的某个事件时,会产生一个事件对象 event,这个对象中包含着所有与事件有关的信息.所有浏览器都支持 event 对象,但有兼容性问题. 获取事件对象 一般地,event 对象是事件程序的第一个参数.IE8及以下浏览器不支持: 另一种方法是直接使用 event 变量,firefox 浏览器不支持: 获取事件对象的常见兼容写法: <div id="box" style=&qu

C#事件(Event)学习日记

event 关键字的来由,为了简化自定义方法的构建来为委托调用列表增加和删除方法. 在编译器处理 event 关键字的时候,它会自动提供注册和注销方法以及任何必要的委托类型成员变量. 这些委托成员变量总是声明为私有的,因此不能直接从触发事件对象访问它们. 温馨提示:如果您对于委托不是很了解,您可以先看 C#委托(Delegate) ,这对您理解本章会有所帮助. 定义一个事件的步骤: 需要定义一个委托,它包含事件触发时将要调用方法 通过 event 关键字用相关委托声明这个事件 话不多说,我们来看

Cocos2d-X3.0 刨根问底(七)----- 事件机制Event源码分析

这一章,我们来分析Cocos2d-x 事件机制相关的源码, 根据Cocos2d-x的工程目录,我们可以找到所有关于事件的源码都存在放在下图所示的目录中. 从这个event_dispatcher目录中的文件命名上分析 cocos2d-x与事件相关的类一共有四种, Event, EventListener,EventDispatcher, Touch分别为 事件,事件侦听器,事件分发器,触摸 我们先从Event类开始. 打开CCEvent.h文件 /** * Base class of all ki

javaScript中的事件对象event

事件对象event,每当一个事件被触发的时候,就会随之产恒一个事件对象event,该对象中主要包括了关于该事件的基本属性,事件类型type(click.dbclick等值).目标元素target(我的理解是事件源对象,即触发该事件的dom元素)等,以及一些与该事件相关的方法.取消事件默认行为preventDefault().组织事件继续冒泡或捕获stopPropagation()等等,这里我仅仅列举了,项目中我用到的属性和方法. 既然事件被触发.就随之产生了一个event对象.笔者在IE中測试了