Flink中的Time与Window

一、Time

在Flink的流式处理中,会涉及到时间的不同概念

Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳

Ingestion Time:是数据进入Flink的时间

Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如一条日志进入Flink的时间为2017-11-12 10:00:00.123 到达window的系统时间为 2017-11-12 10:00:01.234,日志内容如下:

2017-11-02 18:37:15.624 INFO Fair over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?----- eventTime,因为我们要根据日志的生成时间进行统计。

如果要想聚合,不可能对无解数据流进行聚合。

二、Window

1、streaming流式计算是一种被设计用于处理处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的"buckets"桶,我们可以在这些桶上做计算操作。

共有两类,五种时间窗口。

2、Window类型(两类)

2.1、CountWindow:按照指定的数据条数生成一个window,与时间无关

2.2、TimeWindow:按照时间生成window。(按照Processing Time来划分Window)

对于TimeWindow和CountWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

(1)滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切分。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。

(2)滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。

因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

使用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警。)

(3)会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成。类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session 窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的

时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个Session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃

周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

三、Window API

3.1、CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的所有元素的总数。

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

/**
  * CountWindow 中的滚动窗口(Tumbling Windows)
  * 将数据依据固定的窗口长度对数据进行切分。
  */
object TimeAndWindow {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.socketTextStream("localhost",11111)
    val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)
    //注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的所有元素的总数。
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5)
                .reduce((item1, item2)=>(item1._1,item1._2+item2._2))

    streamWindow.print()
    env.execute("TimeAndWindow")

  }
}

3.2

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

/**
  * CountWindow 中的滑动窗口(Sliding Windows)
  * 将数据依据固定的窗口长度对数据进行切分。
  */
object TimeAndWindow {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.socketTextStream("localhost",11111)
    val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)
    //注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的所有元素的总数。
    //满足步长,就执行一次,按第一个参数的长度
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2)
                .reduce((item1, item2)=>(item1._1,item1._2+item2._2))

    streamWindow.print()
    env.execute("TimeAndWindow")

  }
}

四、EventTime与Window

1、EventTime的引入

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间戳,引入方式如下所示:

2、Watermark

  概念:我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的

事件戳顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的EventTime顺序排列的。

  Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的Watermark。

  Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。

  数据流中的Watermark用于表示eventTime小于Watermark的数量,都已经到达了,因此,window的执行也是由Watermark触发的。

  Watermark可以理解成一个延迟触发机制。我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime 小于

maxEventTime-t 的所有数据都已经到达。如果有窗口的停止时间等于maxEventTime-t,那么这个窗口被触发执行。

滚动窗口/滑动窗口/会话窗口


import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**  * TimeWindow  */object EventTimeAndWindow {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //开启watermark    //从调用时刻开始给env创建的每一个stream追加时间特征。    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks(      new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {        override def extractTimestamp(element: String): Long = {          // event word  eventTime是日志生成时间,我们从日志中解析EventTime          val eventTime = element.split(" ")(0).toLong          println(eventTime)          eventTime        }      }    ).map(item => (item.split(" ")(1),1L)).keyBy(0)    //加上滚动窗口,窗口大小是5s,调用window的api//    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))    //滑动窗口//    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))    //会话窗口    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))    val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))    streamReduce.print()

    env.execute("EventTimeAndWindow")  }}

原文地址:https://www.cnblogs.com/ssqq5200936/p/11014296.html

时间: 2024-11-08 23:49:40

Flink中的Time与Window的相关文章

「Flink」Flink中的时间类型

Flink中的时间类型和窗口是非常重要概念,是学习Flink必须要掌握的两个知识点. Flink中的时间类型 时间类型介绍 Flink流式处理中支持不同类型的时间.分为以下几种: 处理时间 Flink程序执行对应操作的系统时间.所有基于时间的操作(例如:时间窗口)都将使用运行相应operator的系统时间.例如:每个小时的处理时间窗口包括在系统时间范围内所有operator接收到的记录.例如:如果应用程序在09:15开始运行,则第一个滚动时间窗口将包括:09:15 – 10:00 之间的处理事件

jQuery中$(document).ready()和window.onload的区别

 $(document) ready()和window onload在表面上看都是页面加载时我们就去执行一个函数或动作,但是在具体的细节上$(document) ready()和window onload还是有区别的. 最基本的区别 1.执行时间 window.onload必须等到页面内包括图片.flash等的所有元素加载完毕后才能执行.$(document).ready()是DOM结构绘制完毕后就执行,不必等到加载完毕. 如<p>图片视频等</p>(假设页面只有这一个标签),wi

Flink中task之间的数据交换机制

Flink中的数据交换构建在如下两条设计原则之上: 数据交换的控制流(例如,为实例化交换而进行的消息传输)是接收端初始化的,这非常像最初的MapReduce. 数据交换的数据流(例如,在网络上最终传输的数据)被抽象成一个叫做IntermediateResult的概念,它是可插拔的.这意味着系统基于相同的实现逻辑可以既支持流数据,又支持批处理数据的传输. 数据传输包含多个对象,它们是: JobManager master节点,用于响应任务调度.恢复.协作,以及通过ExecutionGraph数据结

Android 中的Activity、Window、View之间的关系

一.概述   Activity 可以说是应用程序的载体(也可以理解为界面的载体,但是不界面),用户能够在上面绘制界面(Activity本身不绘制界面),并提供用户处理事件的API,维护应用程序的生命周期(Android应用程序是由多个 Activity 堆积而成,而各个 Activity 又有其独立的生命周期). Activity内部组合了一个Window(这是一个抽象类,具体是PhoneWindow)对象.我们自己写的扩展一个Activity时,在onCreate 方法中调用 setConte

《从0到1学习Flink》—— Flink 中几种 Time 详解

前言 Flink 在流程序中支持不同的 Time 概念,就比如有 Processing Time.Event Time 和 Ingestion Time. 下面我们一起来看看这几个 Time: Processing Time Processing Time 是指事件被处理时机器的系统时间. 当流程序在 Processing Time 上运行时,所有基于时间的操作(如时间窗口)将使用当时机器的系统时间.每小时 Processing Time 窗口将包括在系统时钟指示整个小时之间到达特定操作的所有事

Flink 从0到1学习 —— Flink 中如何管理配置?

前言 如果你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据.但是在某些情况下,我们需要将配置数据发送到 Flink 集群并从中接收一些额外的数据. 在本文的第一部分中,我将描述如何将配置数据发送到 Flink 集群.我们需要配置很多东西:方法参数.配置文件.机器学习模型.Flink 提供了几种不同的方法,我们将介绍如何使用它们以及何时使用它们.在本文的第二部分中,我将描述如何从 Flink 集群中获取数据. 如何发送数据给 Ta

Flink中TaskManager端执行用户逻辑过程(源码分析)

TaskManager接收到来自JobManager的jobGraph转换得到的TDD对象,启动了任务,在StreamInputProcessor类的processInput()方法中 通过一个while(true)中不停的拉取上游的数据,然后调用streamOperator.processElement(record)调用用户实现的方法去处理数据拉取的数据 首先先来看下这个operator对象 然后看看OneInputStreamOperator类的UML 这里所有的实现类没有全部列出,只列了

Flink中案例学习--State与CheckPoint

一.State 在Flink中,按照基本类型,对State做了以下两类的划分: Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用.我们可以从逻辑上理解这种状态是一个并行度操作实例和一种Key的对应, <parallel-operator-instance, key>.Operator State(或者non-keyed state),它是和Key无关的一种状态类型.相应地我们从逻辑上去理解这个概念,它相当于一个并行度实例,对应一份状态数据

Flink中的一些核心概念

在源码解读前我们有必要先了解一下Flink的一些基本的但却很关键的概念.这有助于帮助我们理解整个架构.在翻译文档的同时,对于有争议的或者不是非常适合用中文表达的地方,我尽量保留原始英文单词. 程序和数据流 Flink程序的基本构建块是streams和transformations(注意,DataSet在内部也是一个stream).一个stream可以看成一个中间结果,而一个transformations是以一个或多个stream作为输入的某种operation,该operation利用这些str