Apache Flink fault tolerance源码剖析(一)

因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。当然原理归原理,原理体现在代码实现里并不是想象中的那么直观。这里的源码剖析也是我学习以及理解的过程。

作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来保存/恢复状态?这就是本篇要谈的主要内容。

跟Flink官方文档的说明一样,在文章中会混杂着检查点快照这两个术语,不要太过于纠结它们,某种程度它们是一致的。

从这篇文章开始,当我谈及一个类,我会给出它的全限定名,以方便大家对照。

在Flink中,需要具备Fault Tolerance能力的通常是两类对象:function以及operator

其中function通常通过实现Checkpointed来达到这个目的,而operator通过实现StreamOpeator(该接口中包含了快照、恢复状态的接口方法)。

我们会分别来分析这两个接口,然后列举一些典型的需要具备Fault Tolerance功能的对象,并分析它们的实现。

Checkpointed

org.apache.flink.streaming.api.checkpoint.Checkpointed

该接口提供给那些需要持久化状态的functionoperator使用。该接口是同步模式的快照机制。

两个接口方法:

  • snapshotState:获得function或者operator的当前状态(快照),这个状态必须反映该function之前的变更所产生的最终结果

该方法接收两个参数,第一个参数是checkpointId,表示该检查点的ID,第二个参数checkpointTimestamp,检查点的时间戳,被JobManagerSystem.currentTimeMillis()驱动

  • restoreState:用于从之前的检查点中恢复functionoperator的状态,需要注意的是该方法的调用会早于open方法

CheckpointedAsynchronously

org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously

该接口继承自Checkpointed,属于标记型接口,用于跟Checkpointed同步快照机制进行区分。

MessageAcknowledgingSourceBase

org.apache.flink.streaming.api.functions.source

该类是Flink内置的众多SouceFunction之一,也是基于具备ack(确认机制)的Message Queue的实现模板之一。

该类的完整签名:

public abstract class MessageAcknowledgingSourceBase<Type, UId>
    extends RichSourceFunction<Type>
    implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener

首先我们从该类的实现接口中可以看到它希望保存的状态是:SerializedCheckpointData[]:

SerializedCheckpointData表示作为快照数据的被序列化元素的集合

有两个非常重要的属性:

/** The list gathering the IDs of messages emitted during the current checkpoint */
private transient List<UId> idsForCurrentCheckpoint;

/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;

/**
 * Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
 * a checkpoint, ids may be processed again. This happens when the checkpoint completed but the
 * ids for a checkpoint haven‘t been acknowledged yet.
 */
private transient Set<UId> idsProcessedButNotAcknowledged;
  • idsForCurrentCheckpoint:该集合存储着当前检查点覆盖范围内,消费掉的消息的ID集合
  • pendingCheckpoints:该集合收集的是:检查点以及该检查点中那些已经被触发处理的但没有完成的(或没有收到完成通知的)消息的ID集合对(Tuple2)
  • idsProcessedButNotAcknowledged:它用于存储那些已经被处理过的消息的ID,这些消息的ack在检查点完成之后。也就是说,如果从该检查点开始恢复,那么这些id的消息可能会被重放。

看到上面的定义,虽然都是用来存储跟消息ID相关的“集合”,但却是三种不同的数据结构,而且前两个是有序的,最后一个是无序的

后面的文章我们会谈到检查点分:PendingCheckpoint(未完成的)和CompletedCheckpoint(已完成的)

来看Checkpointed的两个接口方法的实现:

    public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
                    idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);

        pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));

        idsForCurrentCheckpoint = new ArrayList<>(64);

        return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
    }

首先是构建快照的方法,它将当前检查点checkpointId及其相关的消费走的消息集合idsForCurrentCheckpoint构建成一个元组Tuple2加入待处理的检查点中pendingCheckpoints。接着重新初始化了idsForCurrentCheckpoint(因为当前这个检查点的快照已经生成了,所以跟当前检查点相关的元素也需要清空掉)。

    public void restoreState(SerializedCheckpointData[] state) throws Exception {
        pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
        // build a set which contains all processed ids. It may be used to check if we have
        // already processed an incoming message.
        for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
            idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
        }
    }

接下来是恢复状态的逻辑,它的过程也很简单。首先反序列化状态对象到pendingCheckpoints,然后遍历整个元组集合,针对每个没有完成的检查点元组checkpoint,提取出每个这些checkpoint对应的消息ID集合,将他们全部加入idsProcessedButNotAcknowledged集合中去。

从构建快照的snapshotState方法中,我们看到针对每个checkpointId都将其涵盖范围内的所有的ID集合拼装成一个二元组加入到pendingCheckpoints。而随着消息被消费,如果到最后该checkpointId对应的所有消息ID都被完全处理也就是说该检查点变成了CompletedCheckpoint,那么如何将该二元组从pendingCheckpoints移除?Flink提供了一个CheckpointListener它会在某个检查点完成之后给出通知,客户程序可以订阅它然后进行相应的回调处理notifyCheckpointComplete,该类的回调实现如下:

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);

        for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
            Tuple2<Long, List<UId>> checkpoint = iter.next();
            long id = checkpoint.f0;

            if (id <= checkpointId) {
                LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
                acknowledgeIDs(checkpointId, checkpoint.f1);
                // remove deduplication data
                idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
                // remove checkpoint data
                iter.remove();
            }
            else {
                break;
            }
        }
    }

获取到已完成的checkpointId,然后遍历整个pendingCheckpoints集合,找到所有checkpointId小于当前已完成的checkpointId,然后完成三个动作:

  • ack 该checkpointId对应的所有这些消息的ID
  • 将这些消息的ID从idsProcessedButNotAcknowledged中移除
  • 将该二元组从pendingCheckpoints中移除

为什么这里判断条件是<=呢,因为checkpointId是时序递增的,而且Flink保证如果某个检查点完成,那么比该检查点小的检查点肯定也完成了。因为,检查点越小与其有关的消息集合越早被处理。

另外一个需要注意的是,该方法中的acknowledgeIDs是抽象方法,待具体类根据自己的ack机制实现。

RabbitMQ 对接Flink的Source —— RMQSource就是通过继承MessageAcknowledgingSourceBase 实现的

FlinkKafkaConsumerBase

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制,只不过这里实现的是异步模式的检查点机制CheckpointedAsynchronously

该类的完整签名如下:

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
        implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>

从签名来看,它快照的数据类型是:

HashMap<KafkaTopicPartition, Long>

该类型描述了kafka消息消费的具体的信息(包含topic,partition,offset)。

然后继续看snapshotState方法:

// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
        //noinspection unchecked
        HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone();

        // the map cannot be asynchronously updated, because only one checkpoint call can happen
        // on this function at a time: either snapshotState() or notifyCheckpointComplete()
        pendingCheckpoints.put(checkpointId, currentOffsets);

        while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
            pendingCheckpoints.remove(0);
        }

这是其核心代码。它先将offsetsState克隆一份,作为当前检查点的快照,然后放入pendingCheckpoints作为待处理的检查点集合。

这里有一个安全性检查:如果待处理的安全点集合大于默认设定的阈值(100),则移除集合中第一个检查点,这么做的目的是为了防止集合太大导致内存泄漏

restoreState方法的实现,只是将待恢复的偏移量快照对象赋予当前对象的偏移量而已。

MessageAcknowledgingSourceBase,为了得到检查点完成的通知,FlinkKafkaConsumerBase也实现了CheckpointListener接口,以在检查点完成时进行回调处理。来看看notifyCheckpointComplete方法的实现:

            HashMap<KafkaTopicPartition, Long> checkpointOffsets;

            // the map may be asynchronously updates when snapshotting state, so we synchronize
            synchronized (pendingCheckpoints) {
                final int posInMap = pendingCheckpoints.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                    return;
                }

                //noinspection unchecked
                checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);

                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingCheckpoints.remove(0);
                }
            }
            if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
                LOG.debug("Checkpoint state was empty.");
                return;
            }
            commitOffsets(checkpointOffsets);

可以看到核心代码都进行了同步保护,因为pendingCheckpoints很可能会被异步更新。它先根据完成了的检查点,获得其在pendingCheckpoints中的索引。如果判断索引不存在,则直接退出。否则,移除该索引对应的快照信息,然后将小于当前索引(较旧的)的快照信息也一并移除(这一点我之前解释过,因为所有的检查点都是按时间递增有序的)。最后将当前完成的检查点对应的消息的偏移量进行commit,也即commitOffsets。只不过这里该方法被定义为抽象方法,因为Kafka不同版本的API差别的原因,由适配不同版本的consumer各自实现。

Flink以Kafka作为Source的具体实现机制,不是本文的重点,后续可以另开文章进行讲解

StatefulSequenceSource

org.apache.flink.streaming.api.functions.source

这是一个有状态的、给定起始和截止元素的并行序列发射器,由于它需要提供exactly once保证,所以它实现了Checkpointed接口。

它主要是用来维护一个称之为collected的发射进度状态,对其进行快照以便于实现fault tolerance

StreamOperator

org.apache.flink.streaming.api.operators. StreamOperator

StreamOperator内置了我们上面谈到的几个跟检查点相关的接口方法:

  • snapshotOperatorState
  • restoreState
  • notifyOfCompletedCheckpoint

这三个方法来自于我们之前谈到的Checkpointed以及CheckpointListener。这也由此可见,在operator中快照机制由可选项变成了必选项。

这是不难理解的,因为operator处于运行时,诸如分区信息都是必须要做快照的。

这里需要注意的是snapshotOperatorState方法,它返回值为StreamTaskState。它是表示task所有状态的一个容器对象,它包含了三类状态:

  • operatorState
  • functionState
  • kvStates

这不是本文的重点,后面的文章再谈

AbstractStreamOperator

org.apache.flink.streaming.api.operators.AbstractStreamOperator

AbstractStreamOperatorStreamOperator的抽闲类,为operator的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。

来看snapshotOperatorState方法:

    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        // here, we deal with key/value state snapshots

        StreamTaskState state = new StreamTaskState();

        if (stateBackend != null) {
            HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
                stateBackend.snapshotPartitionedState(checkpointId, timestamp);
            if (partitionedSnapshots != null) {
                state.setKvStates(partitionedSnapshots);
            }
        }

        return state;
    }

可以看到它依赖于一个叫stateBackend的东西,在之前一篇文章中我们有谈及过,它是state最终的持久化机制的实现。并且从注释可以看到这里只提供了针对key/value状态的快照模板。

AbstractUdfStreamOperator

org.apache.flink.streaming.api.operators

该抽象类继承自AbstractStreamOperator,用于进一步为operator的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)的operator提供模板。

snapshotOperatorState方法:

    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);

        if (userFunction instanceof Checkpointed) {
            @SuppressWarnings("unchecked")
            Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

            Serializable udfState;
            try {
                udfState = chkFunction.snapshotState(checkpointId, timestamp);
            }
            catch (Exception e) {
                throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
            }

            if (udfState != null) {
                try {
                    AbstractStateBackend stateBackend = getStateBackend();
                    StateHandle<Serializable> handle =
                            stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
                    state.setFunctionState(handle);
                }
                catch (Exception e) {
                    throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
                            + e.getMessage(), e);
                }
            }
        }

        return state;
    }

这里我们终于再次看到了Checkpointed接口。这是因为function只是静态的函数,它的运行还必须借助于operator,因此其状态也必须借助于operator来帮助其与Flink的运行时交互以达到最终的持久化的目的。

函数状态的持久化代码:

AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle =
    stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);

小结

本篇剖析了Flink针对Function以及Operator如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。


微信扫码关注公众号:Apache_Flink


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

时间: 2024-10-25 13:59:54

Apache Flink fault tolerance源码剖析(一)的相关文章

Apache Flink fault tolerance源码剖析(二)

继续Flink Fault Tolerance机制剖析.上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制.这篇涉及到一个非常关键的类--CheckpointCoordinator. org.apache.flink.runtime.checkpoint.CheckpointCoordinator 该类可以理解为检查点的协调器,用来协调operator和state的分布

Apache Flink fault tolerance源码剖析完结篇

这篇文章是对Flinkfault tolerance的一个总结.虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及. 回顾这个系列,每篇文章都至少涉及一个知识点.我们来挨个总结一下. 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function.它们通过不同的方式来达到状态快照以及状态恢复的能力.其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口.这两个接口的行为是类似的. 当然对于数据

Apache Flink fault tolerance源码剖析(四)

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器). 这篇文章会谈论一种特殊的检查点,Flink将之命名为--Savepoint(保存点). 因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现.但作为一个特性,值得花费一个篇幅来介绍. 检查点VS保存点 使用数据流API编写的程序可以从保存点来恢复执行.保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态. 保存点是人工触发

Apache Flink fault tolerance源码剖析(三)

上一篇文章我们探讨了基于定时任务的周期性检查点触发机制以及基于Akka的actor模型的消息驱动协同机制.这篇文章我们将探讨Zookeeper在Flink的Fault Tolerance所起到的作用. 其实,Flink引入Zookeeper的目的主要是让JobManager实现高可用(leader选举). 因为Zookeeper在Flink里存在多种应用场景,本篇我们还是将重心放在Fault Tolerance上,即讲解Zookeeper在检查点的恢复机制上发挥的作用. 如果用一幅图表示快照机制

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

tomcat(12)org.apache.catalina.core.StandardContext源码剖析

[0]README 0)本文部分文字描述转自 "how tomcat works",旨在学习 "tomcat(12)StandardContext源码剖析" 的基础知识: 1)Context实例表示一个具体的web 应用程序,其中包含一个或多个Wrapper实例,每个Wrapper 表示一个具体的servlet定义: 2)Context容器还需要其他组件的支持,如载入器和Session 管理器.本章要intro 的 StandardContext是 catalina

tomcat(11)org.apache.catalina.core.StandardWrapper源码剖析

[0]README 0.0)本文部分文字描述转自 "how tomcat works",旨在学习 "tomcat(11)StandardWrapper源码剖析" 的基础知识: 0.1)StandardWrapper 是 Catalina中对Wrapper接口的标准实现:要知道,tomcat 中有4种类型的容器:Engine,Host,Context 和 Wrapper:(干货--review  tomcat 中有4种类型的容器:Engine,Host,Context

终于等到你!阿里正式向 Apache Flink 贡献 Blink 源码

摘要: 如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源.今天,我们终于等到了这一刻. 阿里妹导读:如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源.今天,我们终于等到了这一刻. 阿里资深技术专家大沙,将为大家详细介绍本次开源的Blink主要功能和优化点,希望与业界同仁共同携手,推动Flin

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark