因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。当然原理归原理,原理体现在代码实现里并不是想象中的那么直观。这里的源码剖析也是我学习以及理解的过程。
作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来保存/恢复状态?这就是本篇要谈的主要内容。
跟Flink官方文档的说明一样,在文章中会混杂着检查点和快照这两个术语,不要太过于纠结它们,某种程度它们是一致的。
从这篇文章开始,当我谈及一个类,我会给出它的全限定名,以方便大家对照。
在Flink中,需要具备Fault Tolerance能力的通常是两类对象:function
以及operator
。
其中function
通常通过实现Checkpointed
来达到这个目的,而operator
通过实现StreamOpeator
(该接口中包含了快照、恢复状态的接口方法)。
我们会分别来分析这两个接口,然后列举一些典型的需要具备Fault Tolerance功能的对象,并分析它们的实现。
Checkpointed
org.apache.flink.streaming.api.checkpoint.Checkpointed
该接口提供给那些需要持久化状态的function
或operator
使用。该接口是同步模式的快照机制。
两个接口方法:
- snapshotState:获得
function
或者operator
的当前状态(快照),这个状态必须反映该function
之前的变更所产生的最终结果
该方法接收两个参数,第一个参数是checkpointId
,表示该检查点的ID,第二个参数checkpointTimestamp
,检查点的时间戳,被JobManager
的System.currentTimeMillis()
驱动
- restoreState:用于从之前的检查点中恢复
function
或operator
的状态,需要注意的是该方法的调用会早于open
方法
CheckpointedAsynchronously
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously
该接口继承自Checkpointed
,属于标记型接口,用于跟Checkpointed
的同步快照机制进行区分。
MessageAcknowledgingSourceBase
org.apache.flink.streaming.api.functions.source
该类是Flink内置的众多SouceFunction
之一,也是基于具备ack
(确认机制)的Message Queue
的实现模板之一。
该类的完整签名:
public abstract class MessageAcknowledgingSourceBase<Type, UId>
extends RichSourceFunction<Type>
implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener
首先我们从该类的实现接口中可以看到它希望保存的状态是:SerializedCheckpointData[]
:
SerializedCheckpointData
表示作为快照数据的被序列化元素的集合
有两个非常重要的属性:
/** The list gathering the IDs of messages emitted during the current checkpoint */
private transient List<UId> idsForCurrentCheckpoint;
/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
/**
* Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
* a checkpoint, ids may be processed again. This happens when the checkpoint completed but the
* ids for a checkpoint haven‘t been acknowledged yet.
*/
private transient Set<UId> idsProcessedButNotAcknowledged;
- idsForCurrentCheckpoint:该集合存储着当前检查点覆盖范围内,消费掉的消息的ID集合
- pendingCheckpoints:该集合收集的是:检查点以及该检查点中那些已经被触发处理的但没有完成的(或没有收到完成通知的)消息的ID集合对(Tuple2)
- idsProcessedButNotAcknowledged:它用于存储那些已经被处理过的消息的ID,这些消息的ack在检查点完成之后。也就是说,如果从该检查点开始恢复,那么这些id的消息可能会被重放。
看到上面的定义,虽然都是用来存储跟消息ID相关的“集合”,但却是三种不同的数据结构,而且前两个是有序的,最后一个是无序的
后面的文章我们会谈到检查点分:PendingCheckpoint(未完成的)和CompletedCheckpoint(已完成的)
来看Checkpointed
的两个接口方法的实现:
public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
idsForCurrentCheckpoint = new ArrayList<>(64);
return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
}
首先是构建快照的方法,它将当前检查点checkpointId
及其相关的消费走的消息集合idsForCurrentCheckpoint
构建成一个元组Tuple2
加入待处理的检查点中pendingCheckpoints
。接着重新初始化了idsForCurrentCheckpoint
(因为当前这个检查点的快照已经生成了,所以跟当前检查点相关的元素也需要清空掉)。
public void restoreState(SerializedCheckpointData[] state) throws Exception {
pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
// build a set which contains all processed ids. It may be used to check if we have
// already processed an incoming message.
for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
}
}
接下来是恢复状态的逻辑,它的过程也很简单。首先反序列化状态对象到pendingCheckpoints
,然后遍历整个元组集合,针对每个没有完成的检查点元组checkpoint
,提取出每个这些checkpoint对应的消息ID集合,将他们全部加入idsProcessedButNotAcknowledged
集合中去。
从构建快照的snapshotState
方法中,我们看到针对每个checkpointId
都将其涵盖范围内的所有的ID集合拼装成一个二元组加入到pendingCheckpoints
。而随着消息被消费,如果到最后该checkpointId
对应的所有消息ID都被完全处理也就是说该检查点变成了CompletedCheckpoint
,那么如何将该二元组从pendingCheckpoints
移除?Flink提供了一个CheckpointListener
它会在某个检查点完成之后给出通知,客户程序可以订阅它然后进行相应的回调处理notifyCheckpointComplete
,该类的回调实现如下:
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
Tuple2<Long, List<UId>> checkpoint = iter.next();
long id = checkpoint.f0;
if (id <= checkpointId) {
LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
acknowledgeIDs(checkpointId, checkpoint.f1);
// remove deduplication data
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
// remove checkpoint data
iter.remove();
}
else {
break;
}
}
}
获取到已完成的checkpointId
,然后遍历整个pendingCheckpoints
集合,找到所有checkpointId
小于当前已完成的checkpointId
,然后完成三个动作:
- ack 该
checkpointId
对应的所有这些消息的ID - 将这些消息的ID从idsProcessedButNotAcknowledged中移除
- 将该二元组从
pendingCheckpoints
中移除
为什么这里判断条件是<=
呢,因为checkpointId
是时序递增的,而且Flink保证如果某个检查点完成,那么比该检查点小的检查点肯定也完成了。因为,检查点越小与其有关的消息集合越早被处理。
另外一个需要注意的是,该方法中的acknowledgeIDs
是抽象方法,待具体类根据自己的ack机制实现。
RabbitMQ 对接Flink的
Source
——RMQSource
就是通过继承MessageAcknowledgingSourceBase
实现的
FlinkKafkaConsumerBase
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
FlinkKafkaConsumerBase
是Flink实现基于Kafka的Source
的基类,Kafka提供基于offset
并且可重复消费的机制,使其非常容易实现Fault Tolerance机制,只不过这里实现的是异步模式的检查点机制CheckpointedAsynchronously
。
该类的完整签名如下:
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>
从签名来看,它快照的数据类型是:
HashMap<KafkaTopicPartition, Long>
该类型描述了kafka消息消费的具体的信息(包含topic,partition,offset)。
然后继续看snapshotState
方法:
// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
//noinspection unchecked
HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone();
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingCheckpoints.put(checkpointId, currentOffsets);
while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingCheckpoints.remove(0);
}
这是其核心代码。它先将offsetsState克隆一份,作为当前检查点的快照,然后放入pendingCheckpoints
作为待处理的检查点集合。
这里有一个安全性检查:如果待处理的安全点集合大于默认设定的阈值(100),则移除集合中第一个检查点,这么做的目的是为了防止集合太大导致内存泄漏。
而restoreState
方法的实现,只是将待恢复的偏移量快照对象赋予当前对象的偏移量而已。
同MessageAcknowledgingSourceBase
,为了得到检查点完成的通知,FlinkKafkaConsumerBase
也实现了CheckpointListener
接口,以在检查点完成时进行回调处理。来看看notifyCheckpointComplete
方法的实现:
HashMap<KafkaTopicPartition, Long> checkpointOffsets;
// the map may be asynchronously updates when snapshotting state, so we synchronize
synchronized (pendingCheckpoints) {
final int posInMap = pendingCheckpoints.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
//noinspection unchecked
checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingCheckpoints.remove(0);
}
}
if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
commitOffsets(checkpointOffsets);
可以看到核心代码都进行了同步保护,因为pendingCheckpoints
很可能会被异步更新。它先根据完成了的检查点,获得其在pendingCheckpoints
中的索引。如果判断索引不存在,则直接退出。否则,移除该索引对应的快照信息,然后将小于当前索引(较旧的)的快照信息也一并移除(这一点我之前解释过,因为所有的检查点都是按时间递增有序的)。最后将当前完成的检查点对应的消息的偏移量进行commit,也即commitOffsets
。只不过这里该方法被定义为抽象方法,因为Kafka不同版本的API差别的原因,由适配不同版本的consumer各自实现。
Flink以Kafka作为Source的具体实现机制,不是本文的重点,后续可以另开文章进行讲解
StatefulSequenceSource
org.apache.flink.streaming.api.functions.source
这是一个有状态的、给定起始和截止元素的并行序列发射器,由于它需要提供exactly once保证,所以它实现了Checkpointed
接口。
它主要是用来维护一个称之为collected
的发射进度状态,对其进行快照以便于实现fault tolerance。
StreamOperator
org.apache.flink.streaming.api.operators. StreamOperator
StreamOperator
内置了我们上面谈到的几个跟检查点相关的接口方法:
- snapshotOperatorState
- restoreState
- notifyOfCompletedCheckpoint
这三个方法来自于我们之前谈到的Checkpointed
以及CheckpointListener
。这也由此可见,在operator
中快照机制由可选项变成了必选项。
这是不难理解的,因为
operator
处于运行时,诸如分区信息都是必须要做快照的。
这里需要注意的是snapshotOperatorState
方法,它返回值为StreamTaskState
。它是表示task所有状态的一个容器对象,它包含了三类状态:
- operatorState
- functionState
- kvStates
这不是本文的重点,后面的文章再谈
AbstractStreamOperator
org.apache.flink.streaming.api.operators.AbstractStreamOperator
AbstractStreamOperator
是StreamOperator
的抽闲类,为operator
的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。
来看snapshotOperatorState
方法:
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
// here, we deal with key/value state snapshots
StreamTaskState state = new StreamTaskState();
if (stateBackend != null) {
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
stateBackend.snapshotPartitionedState(checkpointId, timestamp);
if (partitionedSnapshots != null) {
state.setKvStates(partitionedSnapshots);
}
}
return state;
}
可以看到它依赖于一个叫stateBackend
的东西,在之前一篇文章中我们有谈及过,它是state最终的持久化机制的实现。并且从注释可以看到这里只提供了针对key/value状态的快照模板。
AbstractUdfStreamOperator
org.apache.flink.streaming.api.operators
该抽象类继承自AbstractStreamOperator
,用于进一步为operator
的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)的operator
提供模板。
snapshotOperatorState
方法:
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
if (userFunction instanceof Checkpointed) {
@SuppressWarnings("unchecked")
Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
Serializable udfState;
try {
udfState = chkFunction.snapshotState(checkpointId, timestamp);
}
catch (Exception e) {
throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
}
if (udfState != null) {
try {
AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle =
stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);
}
catch (Exception e) {
throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
+ e.getMessage(), e);
}
}
}
return state;
}
这里我们终于再次看到了Checkpointed
接口。这是因为function
只是静态的函数,它的运行还必须借助于operator
,因此其状态也必须借助于operator
来帮助其与Flink的运行时交互以达到最终的持久化的目的。
函数状态的持久化代码:
AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle =
stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);
小结
本篇剖析了Flink针对Function
以及Operator
如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)