有状态计算
在Flink架构体系中,有状态计算可以说是Flink非常重要的特征之一。有状态计算是指在程序计算过程中,在Flink程序内部,存储计算产生的中间结果,并提供给Functions 或 孙子计算结果使用。如图所示:
状态数据可以维系在本地存储中,这里的存储可以是 Flink 的堆内存或者堆外内存,也可以借助第三方的存储介质,例如:Flink中已经实现的RocksDB,当然用户也可以自己实现相应的缓存系统去存储状态信息,以完成更加复杂的计算逻辑。和状态计算不同的是,无状态计算不会存储计算过程中产生的结果,也不会将结果用于下一步计算过程中,程序只会在当前的计算流程中实行计算,计算完成就输出结果,然后下一条数据接入,然后处理。
无状态计算实现的复杂度相对较低,实现起来比较容易,但是无法完成提到的比较复杂的业务场景,例如:
- [ ] 用户想实现CEP(复杂事件处理),获取符合某一特定时间规则的事件,状态计算就可以将接入的事件进行存储,然后等待符合规则的事件触发;
- [ ] 用户想要按照 minutes / hour / day 等进行聚合计算,求取当前最大值、均值等聚合指标,这就需要利用状态来维护当前计算过程中产生的结果,例如事件的总数、总和以及最大,最小值等;
- [ ] 用户想在 Srteam 上实现机器学习的模型训练,状态计算可以帮助用户维护当前版本模型使用的参数;
- [ ] 用户想使用历史的数据进行计算,状态计算可以帮助用户对数据进行缓存,使用户可以直接从状态中获取相应的历史数据。
Flink 状态及应用
状态类型
在 Flink 中根据数据集是否根据 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State) 两种类型。
Keyed State
表示和key相关的一种state ,只能用于 KeyedStream 类型数据集对应的Functions和Operators之上。Keyed State 是 Operator State 的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应一个 Operator 和 Key 的组合。 Keyed State 可以通过 Key Group 进行管理,主要用于当算子并行度发生变化时,自动重新分布 Keyed State 数据。
Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的 Key 无关,每个算子实例中持有所有数据元素中的一部分状态数据。 Operator State 支持当算子实例并行度发生变化时自动重新分配状态数据。
同时在Flink中 Keyed State 和 Operator State 均具有两种形式,其中一种为托管状态(Managered State)形式,由Flink Runtime 中控制和管理状态数据,并将状态数据转换称为内存Hash tables 或 Recks DB 的对象存储,然后将这些状态数据通过内部接口持久化到 Checkpoints 中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发 Checkpoints 过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成 bytes 数据存储在 Checkpoints 中,当从 Checkpoints 恢复任务时,算子自己在反序列化出状态的数据结构。
Notes: Flink中推荐用户使用 Managered State 管理状态数据,主要原因是:Manager State 能够更好的支持状态数据的重平衡以及更加完善的内存管理。
Managered Keyed State
Flink 有以下Managered Keyed State 类型可以使用,每种状态都有相应的的使用场景,用户可以根据实际需求选择使用。
- [ ]
ValueState[T]
: 与 Key 对应单个值的状态,例如统计 user_id 对应的交易次数,每次用户交易都会在 count 状态值上进行更新。 ValueState 对应的更新方法是update(T)
, 取值是T value()
; - [ ]
ListState[T]
: 与 Key 对应元素列表的状态,状态中存放元素的 List 列表。例如定义 ListValue存储用户经常访问的 IP 地址。在 ListState 中添加元素使用add(T) , addAll(List[T])
两个方法。获取元素使用Iterable<T> get()
方法,更新元素使用update(List[T])
方法; - [ ]
ReducingState[T]
: 定义与 Key 相关的数据元素单个聚合值的状态,用户存储经过指定 ReduceFunction 计算之后的指标,因此,ReduceState 需要指定ReduceFunction 完成状态数据的聚合。ReducingState 添加元素使用add(T)
方法,获取元素使用T get()
; - [ ]
AggregeateState[IN,OUT]
: 定义 与key相关的数据元素单个聚合值的状态,用于维护数据经过指定 AggregateFunction 计算之后的指标。和ReducingState相比,AggregeateState 的输入输出类型不一定相同,但ReducingState 输入/出 类型必须保持一致。和ListState相似,AggregatingState 需要指定AggregateFunction完成状态数据的聚合操作。AggregatringState添加元素使用add(IN)
方法, 获取元素使用OUT get()
方法; - [ ]
MapState<UK, UV>
:这会保留一个映射列表。您可以将键值对放入状态并检索Iterable所有当前存储的映射。使用put(UK, UV)
或 添加映射putAll(Map[UK,UV])
(Map<UK, UV>)。可以使用来检索与用户键关联的值get(UK)
。对于映射,键和值可迭代视图可以使用被检索entries()
,keys()
并values()
分别。
Stateful Function定义
示例:
在RichFlatMapFunction 中定义 ValueState,已完成最小值的获取:
inputStream.keyBy(_._1).flatMap(
// (String,Long,Int) 输入类型
// (String,Long,Long) 输出类型
new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] {
private var leastValueState:ValueState[Long] = _
// 定义状态名称
private var leastValueStateDesc:ValueStateDescriptor[Long] = _
override def open(parameters: Configuration): Unit = {
// 指定状态类型
leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long])
// 通过 getRuntimeContext.getState 拿到状态
leastValueState = getRuntimeContext.getState(leastValueStateDesc)
}
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
// 通过 value 拿到最小值
val leastValue: Long = leastValueState.value()
// 如果前一个指标大于最小值,则直接输出数据元素和最小值
if ( leastValue != 0L && value._2 > leastValue){
out.collect((value._1 , value._2 , leastValue))
}else{
// 如果当前指标小于最小值,则更新状态中的最小值
leastValueState.update(value._2)
// 将当前数据中的指标作为最小值输出
out.collect(value._1 , value._2 , value._2)
}
}
}).print()
State生命周期
对于任何类型 Keyed State 都可以设定状态生命周期(TTL),以确保能够在规定时间内即时清理状态数据。状态生命周期功能可通过 StateTtlConfig 配置然后将 StateTtlConfig 配置传入StateDescriptor 中的 enableTimeToLive 方法中即可。Keyed State 配置实例如下所示:
val config: StateTtlConfig = StateTtlConfig
// 指定TTL时长为 5s
.newBuilder(Time.seconds(5))
// 指定TTL 刷新只对创建和写入操作有效
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 指定状态可见性不返回过期数据
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
leastValueStateDesc.enableTimeToLive(config)
在StateTtlConfig中除了通过 newBuilder() 方法中设定过期时间的参数是必须的之外,其他的参数都是可选的或使用默认值。其中 setUpdateType方法中传入的类型有两种:
- StateTtlConfig.UpdateType.onCreateAndWrite 仅在创建和写入时更新 TTL ;
- StateTtlConfig.UpdateType.OnReadAndWriter 仅在读与写操作都更新 TTL ;
需要注意的是,过期的状态数据根据UpdateType参数进行配置,只有被写入或者读取的是时间才会更新TTL,也就是说如果某个状态指标一直不被使用活着更新,则永远不会触发对该状态数据的清理操作,这种情况可能会导致系统中的状态数据越来越大。
另外,可以通过 setStateVisibility 方法设定状态的可见性,根据过期数据是否被清理来确定是否返回状态数据:
- StateTtlConfig.StateVisibility.NeverReturnExpired: 状态数据过期就不会返回(默认)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: 状态数据即使过期但没有被清理依然返回
Scala DataStream API中使用状态
直接上代码片段:
inputStream.keyBy(_._1)
// 指定输入参数类型和状态参数类型
.mapWithState((in:(Int,Long) , count : Option[Int]) =>
// 判断count 类型是否非空
count match {
// 输出 key , count 并在原来 count 数据上累加
case Some(c) => ((in._1 , c) , Some(c + in._2))
// 如果状态为空,则将指标填入
case None => ((in._1 , 0) , Some(in._2))
}
)
Manager Operator State
Operator State 是一种 non-keyed-state ,与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 消费端算子实例都对应到 Kafka 的一个分区中,维护Topic分区和 Offsets 偏移量作为算子的 Operator State. 在Flink中可以实现 CheckpointedFunction
或者 ListCheckpoint<T extends Serializable>
两个接口来定义操作 Managered Operator State 的函数。
通过 CheckpointedFunction 接口操作Operator State
CheckpointedFunction 接口定义如图:
@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
在每个独立的算子中,Managered Operator State 都是以 List 形式存储的,算子和算子之间的状态数据相互独立,List存储比较适合于状态数据的重新分布,Flink目前支持Manager Operator State 两种重要分布策略,分别是 Event-split Redistribution 和 Union Redistribution。
- [ ] Event-split Redistribution: 每个算子实例中含有部分元素的List列表,整个状态数据是所有List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度相同数量的List列表,每个 task 实例中有一个 List,其可以为空或者含有多个元素。
- [ ] Union Redistribution: 每个算子实例中含有所有状态元素的List 列表,当触发 restore/redistribution 动作时,每个算子可以获取到完整的状态元素列表。
Checkpoints 和 Savepoints
状态管理器
Querable State
原文地址:https://www.cnblogs.com/sun-iot/p/12089562.html