大数据计算引擎之Flink Flink状态管理和容错

原文地址:大数据计算引擎之Flink Flink状态管理和容错

有状态计算

在Flink架构体系中,有状态计算可以说是Flink非常重要的特征之一。有状态计算是指在程序计算过程中,在Flink程序内部,存储计算产生的中间结果,并提供给Functions 或 孙子计算结果使用。如图所示:

状态数据可以维系在本地存储中,这里的存储可以是 Flink 的堆内存或者堆外内存,也可以借助第三方的存储介质,例如:Flink中已经实现的RocksDB,当然用户也可以自己实现相应的缓存系统去存储状态信息,以完成更加复杂的计算逻辑。和状态计算不同的是,无状态计算不会存储计算过程中产生的结果,也不会将结果用于下一步计算过程中,程序只会在当前的计算流程中实行计算,计算完成就输出结果,然后下一条数据接入,然后处理。
无状态计算实现的复杂度相对较低,实现起来比较容易,但是无法完成提到的比较复杂的业务场景,例如:

  • [ ] 用户想实现CEP(复杂事件处理),获取符合某一特定时间规则的事件,状态计算就可以将接入的事件进行存储,然后等待符合规则的事件触发;
  • [ ] 用户想要按照 minutes / hour / day 等进行聚合计算,求取当前最大值、均值等聚合指标,这就需要利用状态来维护当前计算过程中产生的结果,例如事件的总数、总和以及最大,最小值等;
  • [ ] 用户想在 Srteam 上实现机器学习的模型训练,状态计算可以帮助用户维护当前版本模型使用的参数;
  • [ ] 用户想使用历史的数据进行计算,状态计算可以帮助用户对数据进行缓存,使用户可以直接从状态中获取相应的历史数据。

Flink 状态及应用

状态类型

在 Flink 中根据数据集是否根据 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State) 两种类型。

Keyed State

表示和key相关的一种state ,只能用于 KeyedStream 类型数据集对应的Functions和Operators之上。Keyed State 是 Operator State 的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应一个 Operator 和 Key 的组合。 Keyed State 可以通过 Key Group 进行管理,主要用于当算子并行度发生变化时,自动重新分布 Keyed State 数据。

Operator State

与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的 Key 无关,每个算子实例中持有所有数据元素中的一部分状态数据。 Operator State 支持当算子实例并行度发生变化时自动重新分配状态数据。

同时在Flink中 Keyed State 和 Operator State 均具有两种形式,其中一种为托管状态(Managered State)形式,由Flink Runtime 中控制和管理状态数据,并将状态数据转换称为内存Hash tables 或 Recks DB 的对象存储,然后将这些状态数据通过内部接口持久化到 Checkpoints 中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发 Checkpoints 过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成 bytes 数据存储在 Checkpoints 中,当从 Checkpoints 恢复任务时,算子自己在反序列化出状态的数据结构。

Notes: Flink中推荐用户使用 Managered State 管理状态数据,主要原因是:Manager State 能够更好的支持状态数据的重平衡以及更加完善的内存管理。

Managered Keyed State

Flink 有以下Managered Keyed State 类型可以使用,每种状态都有相应的的使用场景,用户可以根据实际需求选择使用。

  • [ ] ValueState[T]: 与 Key 对应单个值的状态,例如统计 user_id 对应的交易次数,每次用户交易都会在 count 状态值上进行更新。 ValueState 对应的更新方法是 update(T) , 取值是 T value() ;
  • [ ] ListState[T]: 与 Key 对应元素列表的状态,状态中存放元素的 List 列表。例如定义 ListValue存储用户经常访问的 IP 地址。在 ListState 中添加元素使用 add(T) , addAll(List[T]) 两个方法。获取元素使用 Iterable<T> get() 方法,更新元素使用 update(List[T])方法;
  • [ ] ReducingState[T]: 定义与 Key 相关的数据元素单个聚合值的状态,用户存储经过指定 ReduceFunction 计算之后的指标,因此,ReduceState 需要指定ReduceFunction 完成状态数据的聚合。ReducingState 添加元素使用 add(T)方法,获取元素使用 T get() ;
  • [ ] AggregeateState[IN,OUT]: 定义 与key相关的数据元素单个聚合值的状态,用于维护数据经过指定 AggregateFunction 计算之后的指标。和ReducingState相比,AggregeateState 的输入输出类型不一定相同,但ReducingState 输入/出 类型必须保持一致。和ListState相似,AggregatingState 需要指定AggregateFunction完成状态数据的聚合操作。AggregatringState添加元素使用 add(IN) 方法, 获取元素使用 OUT get() 方法;
  • [ ] MapState<UK, UV>:这会保留一个映射列表。您可以将键值对放入状态并检索Iterable所有当前存储的映射。使用put(UK, UV)或 添加映射putAll(Map[UK,UV])(Map<UK, UV>)。可以使用来检索与用户键关联的值get(UK)。对于映射,键和值可迭代视图可以使用被检索entries()keys()values()分别。

Stateful Function定义
示例:
在RichFlatMapFunction 中定义 ValueState,已完成最小值的获取:

    inputStream.keyBy(_._1).flatMap(
      // (String,Long,Int) 输入类型
      // (String,Long,Long) 输出类型
      new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] {
        private var leastValueState:ValueState[Long] = _
        // 定义状态名称
        private var leastValueStateDesc:ValueStateDescriptor[Long] = _
        override def open(parameters: Configuration): Unit = {
          // 指定状态类型
          leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long])
          // 通过 getRuntimeContext.getState 拿到状态
          leastValueState = getRuntimeContext.getState(leastValueStateDesc)
        }
        override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
          // 通过 value 拿到最小值
          val leastValue: Long = leastValueState.value()

          // 如果前一个指标大于最小值,则直接输出数据元素和最小值
          if ( leastValue != 0L && value._2 > leastValue){
            out.collect((value._1 , value._2 , leastValue))
          }else{
            // 如果当前指标小于最小值,则更新状态中的最小值
            leastValueState.update(value._2)
            // 将当前数据中的指标作为最小值输出
            out.collect(value._1 , value._2 , value._2)
          }
        }
      }).print()

State生命周期

对于任何类型 Keyed State 都可以设定状态生命周期(TTL),以确保能够在规定时间内即时清理状态数据。状态生命周期功能可通过 StateTtlConfig 配置然后将 StateTtlConfig 配置传入StateDescriptor 中的 enableTimeToLive 方法中即可。Keyed State 配置实例如下所示:

          val config: StateTtlConfig = StateTtlConfig
            // 指定TTL时长为 5s
            .newBuilder(Time.seconds(5))
            // 指定TTL 刷新只对创建和写入操作有效
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            // 指定状态可见性不返回过期数据
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
          leastValueStateDesc.enableTimeToLive(config)

在StateTtlConfig中除了通过 newBuilder() 方法中设定过期时间的参数是必须的之外,其他的参数都是可选的或使用默认值。其中 setUpdateType方法中传入的类型有两种:

  1. StateTtlConfig.UpdateType.onCreateAndWrite 仅在创建和写入时更新 TTL ;
  2. StateTtlConfig.UpdateType.OnReadAndWriter 仅在读与写操作都更新 TTL ;
    需要注意的是,过期的状态数据根据UpdateType参数进行配置,只有被写入或者读取的是时间才会更新TTL,也就是说如果某个状态指标一直不被使用活着更新,则永远不会触发对该状态数据的清理操作,这种情况可能会导致系统中的状态数据越来越大。

另外,可以通过 setStateVisibility 方法设定状态的可见性,根据过期数据是否被清理来确定是否返回状态数据:

  1. StateTtlConfig.StateVisibility.NeverReturnExpired: 状态数据过期就不会返回(默认)
  2. StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: 状态数据即使过期但没有被清理依然返回

Scala DataStream API中使用状态

直接上代码片段:

    inputStream.keyBy(_._1)
      // 指定输入参数类型和状态参数类型
      .mapWithState((in:(Int,Long) , count : Option[Int]) =>
        // 判断count 类型是否非空
        count match {
          // 输出 key , count 并在原来 count 数据上累加
          case Some(c) => ((in._1 , c) , Some(c + in._2))
            // 如果状态为空,则将指标填入
          case None => ((in._1 , 0) , Some(in._2))
        }
      )

Manager Operator State

Operator State 是一种 non-keyed-state ,与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 消费端算子实例都对应到 Kafka 的一个分区中,维护Topic分区和 Offsets 偏移量作为算子的 Operator State. 在Flink中可以实现 CheckpointedFunction 或者 ListCheckpoint<T extends Serializable>两个接口来定义操作 Managered Operator State 的函数。

通过 CheckpointedFunction 接口操作Operator State

CheckpointedFunction 接口定义如图:

@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when the parallel function instance is created during distributed
     * execution. Functions typically set up their state storing data structures in this method.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

在每个独立的算子中,Managered Operator State 都是以 List 形式存储的,算子和算子之间的状态数据相互独立,List存储比较适合于状态数据的重新分布,Flink目前支持Manager Operator State 两种重要分布策略,分别是 Event-split Redistribution 和 Union Redistribution。

  • [ ] Event-split Redistribution: 每个算子实例中含有部分元素的List列表,整个状态数据是所有List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度相同数量的List列表,每个 task 实例中有一个 List,其可以为空或者含有多个元素。
  • [ ] Union Redistribution: 每个算子实例中含有所有状态元素的List 列表,当触发 restore/redistribution 动作时,每个算子可以获取到完整的状态元素列表。

Checkpoints 和 Savepoints

状态管理器

Querable State

原文地址:https://www.cnblogs.com/sun-iot/p/12089562.html

时间: 2024-11-07 10:38:15

大数据计算引擎之Flink Flink状态管理和容错的相关文章

上:Spark VS Flink – 下一代大数据计算引擎之争,谁主沉浮?

作者简介 王海涛,曾经在微软的 SQL Server和大数据平台组工作多年.带领团队建立了微软对内的 Spark 服务,主打 Spark Streaming.去年加入阿里实时计算部门,参与改进阿里基于 Apache Flink 的Blink 平台. 导读: 做大数据绝对躲不过的一个热门话题就是实时流计算,而提到实时流计算,就不得不提 Spark 和 Flink.Spark 从 2014 年左右开始迅速流行,刚推出时除了在某些场景比 Hadoop MapReduce 带来几十到上百倍的性能提升外,

Cubert:LinkedIn开源的大数据计算引擎

近日, Linkedin 宣布开源其正在使用的大数据计算引擎 Cubert ,该框架提供了一种新的数据模型来组织数据,并使用诸如MeshJoin 和Cube算法等算法来对组织后的数据进行计算,从而减轻了系统负荷和节省了CPU资源,最终提供给用户一个简单.高效的查询.Cubert比较适合的计 算领域包括统计计算.聚合.时间距离计算.增量计算.图形计算等. Cubert整个架构可分为三层,第一层是数据流语言层,主要用来实现执行计划,包括 Apache Pig . Apache Hive 以及Cube

揭秘阿里云EB级大数据计算引擎MaxCompute

日前,全球权威咨询与服务机构Forrester发布了<The Forrester WaveTM: Cloud Data Warehouse, Q4 2018>报告.这是Forrester Wave首次发布关于云数仓解决方案(Cloud Data Warehouse,简称CDW)的测评.报告对云数仓的当前产品功能.产品路线和发展策略.市场表现等几个方面进行全面的评估,在产品能力排行榜中,阿里云力压微软排行第7. Forrester测评报告对CDW核心功能的评估主要从解决方案的多样性.数据集成.性

大数据计算引擎之Flink Flink CEP复杂事件编程

基础概念 FlinkCEP 说明 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件.具备如下的特征: 目标:从有序的简单事件流中发现一些高阶特征 输入:一个或多个由简单事件构成的事件流 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件 输出:满足规则的复杂事件 CEP用于分析低延迟.频繁产生的不同来源的事件流. CEP 可以帮助在复杂的.不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为

新一代大数据计算引擎 Flink从入门到实战

Flink第一章 [录播]01.体验flink-安装配置-启动(16分钟) 免费试学 [录播]02.启动flink-scala-shell读取文件实现打印(10分钟) [录播]03.使用flink scala shell实现word count(16分钟) [录播]04.使用静态数据模拟流操作fromElements(3分钟) [录播]05.运行word count套接字流计算程序(11分钟) 02 Flink第二章 [录播]06.idea flink api编程(8分钟) [录播]07.git

大数据计算引擎发展的四个阶段

根据一些公开资料整理,也许有失偏颇,仅供参考: 1.第一代 Hadoop 承载的 MapReduce 2.第二代 支持 DAG(有向无环图) 的框架: Tez . Oozie,主要还是还是批处理任务 3.第三代 Job 内部的 DAG(有向无环图) 支持(不跨越 Job),以及强调的实时计算:Spark 4.第四代 对流计算的支持,以及更一步的实时性:Flink

通过 GOOGLE 大数据计算平台演进理解 APACHE FLINK 前世今生

一.背景 2019年1月,伴随 APACHE FLINK 母公司 Data Artisans 被 收购 ,FLINK 毫无争议成为继 SPARK 之后的新一代大数据计算平台,本文希望通过 GOOGLE 计算平台演进来更好的理解 FLINK. 二.GOOGLE 大数据计算平台演进 GOOGLE 作为搜索引擎的顶级公司,需要处理海量数据,其大数据计算平台的演进是行业的风向标:本文通过 GOOGLE 在该领域发表的论文进行剖析,希望从中提取一些演进的主线. 2.1 分布式的三篇经典 2003年,[Th

一文读懂大数据计算框架与平台

1.前言 计算机的基本工作就是处理数据,包括磁盘文件中的数据,通过网络传输的数据流或数据包,数据库中的结构化数据等.随着互联网.物联网等技术得到越来越广泛的应用,数据规模不断增加,TB.PB量级成为常态,对数据的处理已无法由单台计算机完成,而只能由多台机器共同承担计算任务.而在分布式环境中进行大数据处理,除了与存储系统打交道外,还涉及计算任务的分工,计算负荷的分配,计算机之间的数据迁移等工作,并且要考虑计算机或网络发生故障时的数据安全,情况要复杂得多. 举一个简单的例子,假设我们要从销售记录中统

Flink状态管理和容错机制介绍

本文主要内容如下: 有状态的流数据处理: Flink中的状态接口: 状态管理和容错机制实现: 阿里相关工作介绍: 一.有状态的流数据处理# 1.1.什么是有状态的计算# 计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态,其实大多数的计算都是有状态的计算. 比如wordcount,给一些word,其计算它的count,这是一个很常见的业务场景.count做为输出,在计算的过程中要不断的把输入累加到count上去,那么count就是一个state. 1.2.传统的流计算系统缺少对于程序状态的有效