【翻译】Flink Table Api & SQL —Streaming 概念 ——时间属性

本文翻译自官网: Time Attributes   https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html

Flink能够根据不同的时间概念处理流数据。

  • Process time 是指正在执行相应操作的机器的系统时间(也称为“挂钟时间”)。
  • Event time 是指基于附在每行上的时间戳对流数据进行处理。时间戳可以在事件发生时进行编码。
  • Ingestion time 是事件进入Flink的时间;在内部,它的处理类似于事件时间。

有关Flink中时间处理的更多信息,请参见有关事件时间和水印的介绍。

本页说明如何在Flink的Table API和SQL中为基于时间的操作定义时间属性。

时间属性简介

Table APISQL中的基于时间的操作(例如窗口)都需要有关时间概念及其起源的信息。因此,表可以提供逻辑时间属性,以指示时间并访问表程序中的相应时间戳。

时间属性可以是每个表结构的一部分。它们是从DataStream创建表时定义的,或者是在使用TableSource时预定义的。一旦在开始定义了时间属性,就可以将其作为字段引用,并可以在基于时间的操作中使用。

只要时间属性没有被修改并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性。时间属性的行为类似于常规时间戳,可以进行访问以进行计算。常规时间戳记不能与Flink的时间和水印系统配合使用,因此不能再用于基于时间的操作。

表程序要求已为流环境指定了相应的时间特征:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

处理时间

处理时间允许表程序根据本地计算机的时间产生结果。这是最简单的时间概念,但不提供确定性。它既不需要时间戳提取也不需要水印生成。

有两种定义处理时间属性的方法。

在数据流到表的转换期间

在结构定义期间,使用.proctime属性定义了处理时间属性。时间属性只能通过其他逻辑字段扩展物理结构。因此,只能在结构定义的末尾定义它。

val stream: DataStream[(String, String)] = ...

// declare an additional logical field as a processing time attribute
val table = tEnv.fromDataStream(stream, ‘UserActionTimestamp, ‘Username, ‘Data, ‘UserActionTime.proctime)

val windowedTable = table.window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)

使用TableSource

处理时间属性由实现DefinedProctimeAttribute接口的TableSource定义。逻辑时间属性附加到由TableSource的返回类型定义的物理结构。

class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

    override def getReturnType = {
        val names = Array[String]("Username" , "Data")
        val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
        Types.ROW(names, types)
    }

    override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
        // create stream
        val stream = ...
        stream
    }

    override def getProctimeAttribute = {
        // field with this name will be appended as a third field
        "UserActionTime"
    }
}

// register table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)

事件时间

事件时间允许表程序根据每个记录中包含的时间来产生结果。即使在无序事件或迟发事件的情况下,这也可以提供一致的结果。从持久性存储中读取记录时,还可以确保表程序的可重播结果。

此外,事件时间允许批处理和流环境中的表程序使用统一语法。流环境中的时间属性可以是批处理环境中记录的常规字段。

为了处理乱序事件并区分流中的按时事件和延迟事件,Flink需要从事件中提取时间戳并及时进行某种处理(就是水印)。

可以在DataStream到Table的转换期间或使用TableSource 定义事件时间属性。

在DataStream 到 Table 的转换期间

在结构定义期间,事件时间属性是使用.rowtime属性定义的。必须在转换的DataStream中分配时间戳和水印

将 DataStream 转换为 Table 时,有两种定义时间属性的方法。根据指定的.rowtime字段名称是否存在于DataStream的结构中,timestamp字段为

  • 作为新字段附加到结构
  • 替换现有字段。

无论哪种情况,事件时间时间戳字段都将保留DataStream事件时间 时间戳的值。

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// declare an additional logical field as an event time attribute
val table = tEnv.fromDataStream(stream, ‘Username, ‘Data, ‘UserActionTime.rowtime)

// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
val table = tEnv.fromDataStream(stream, ‘UserActionTime.rowtime, ‘Username, ‘Data)

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)

使用TableSource

事件时间属性由实现了DefinedRowtimeAttributes接口的TableSource定义。getRowtimeAttributeDescriptors()方法返回用于描述时间属性最终名称的RowtimeAttributeDescriptor列表,用于导出属性值的时间戳提取器以及与该属性关联的水印策略。

请确保由getDataStream()方法返回的DataStream与定义的时间属性对齐。仅当定义了StreamRecordTimestamp时间戳提取器时,才考虑DataStream的时间戳(由TimestampAssigner分配的时间戳)。仅当定义了PreserveWatermarks水印策略时,才会保留DataStream的水印。 否则,仅TableSource的rowtime属性的值相关。

// define a table source with a rowtime attribute
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

    override def getReturnType = {
        val names = Array[String]("Username" , "Data", "UserActionTime")
        val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
        Types.ROW(names, types)
    }

    override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
        // create stream
        // ...
        // assign watermarks based on the "UserActionTime" attribute
        val stream = inputStream.assignTimestampsAndWatermarks(...)
        stream
    }

    override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
        // Mark the "UserActionTime" attribute as event-time attribute.
        // We create one attribute descriptor of "UserActionTime".
        val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "UserActionTime",
            new ExistingField("UserActionTime"),
            new AscendingTimestamps)
        val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
        listRowtimeAttrDescr
    }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

原文地址:https://www.cnblogs.com/Springmoon-venn/p/11846931.html

时间: 2024-10-07 20:30:56

【翻译】Flink Table Api & SQL —Streaming 概念 ——时间属性的相关文章

【翻译】Flink Table Api & SQL — 配置

本文翻译自官网:Configuration https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html 默认情况下,Table&SQL API已预先配置为产生具有可接受性能的准确结果. 根据表程序的要求,可能需要调整某些参数以进行优化.例如,无界流程序可能需要确保所需的状态大小是有上限的(请参阅流概念). 总览 执行选项 优化器选项 总览 在每个表环境中,TableConfig提供了用于配置当

【翻译】Flink Table 和 SQL API 概念与通用API

本文翻译自官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html Table API和SQL集成在共同API中.该API的中心概念是Table,用作查询的输入和输出.本文档介绍了使用Table API和SQL查询的程序的通用结构,如何注册 Table,如何查询Table以及如何发出 Table(数据). 两个 planner 之间的主要区别 表API和SQL程序的结构 创建一个Tab

Flink 报错 "Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath"

转自: https://www.cnblogs.com/Springmoon-venn/p/10570056.html 先上代码: table = tablexx.select('*).tablexx.groupBy('x).select('x, xx.count ) tableEnvironment // declare the external system to connect to .connect( new Kafka() .version("0.10") .topic(&q

Flink DataStream API Programming Guide

Example Program The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows.   public class WindowWordCount { public static void main(String[] arg

SQL导入txt以及SQL中的时间格式操作

原文:SQL导入txt以及SQL中的时间格式操作 MySQL中导入txt的指令为: load data local infile "路径名称" into table "表名" 比如我文件的具体位置为"f:\\dataset\\beijing\\xx.txt",创建的表名为"person",则上述指令的具体表达为: load data local infile "f:\\dataset\\beijing\\xx.txt

Flink DataStream API

5.Flink DataStream API 5.1 Flink 运行模型 以上为 Flink 的运行模型,Flink 的程序主要由三部分构成,分别为 Source. Transformation.Sink.DataSource 主要负责数据的读取,Transformation 主要负责对 属于的转换操作,Sink 负责最终数据的输出. 5.2 Flink 程序架构 每个 Flink 程序都包含以下的若干流程: ? 获得一个执行环境:(Execution Environment) ? 加载/创建

SQL语句关于时间的查询小心得,希望大家给点意见

查询本月信息:Select * FROM T_Users Where datediff(month,RegisterTime,getdate())=0 昨天的信息:SELECT * FROM T_Users where LastLoginTime>DATEADD(DAY,-2,GETDATE()) AND LastLoginTime<GETDATE() SELECT SUM([DetailMoney]) FROM [dbo].[T_BuyDetails] INNER JOIN [dbo].[T

SQL SERVER 查看SQL语句IO,时间,索引消耗

1.查看SQL语句IO消耗 set statistics io on     select * from dbo.jx_order where order_time>'2011-04-12 12:49:57.580' set statistics io off 2.查看SQL语句时间消耗 set statistics time on      select * from dbo.jx_order where order_time>'2011-04-12 12:49:57.580' set st

MS SQL Server带有时间的记录怎样查询

原文:MS SQL Server带有时间的记录怎样查询 比如某一张表[A]有一个保存日期包含时间字段[B],如果以这个段[B]作查询条件对数据记录进行查询.也我们得花些心思才能查询到我们想得到的记录. 现在我们需要查询这天2014-06-21的所有记录: SELECT * FROM [A] WHERE [B] = '2014-06-21' 上面的语法,将查询不到任何记录.也许会有网友想到使用BETWEEN: SELECT * FROM [A] WHERE [B] BETWEEN '2014-06