Apache Flink fault tolerance源码剖析(二)

继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制。这篇涉及到一个非常关键的类——CheckpointCoordinator

org.apache.flink.runtime.checkpoint.CheckpointCoordinator

该类可以理解为检查点的协调器,用来协调operatorstate的分布式快照。

周期性的检查点触发机制

检查点的触发机制是基于定时器的周期性触发。这涉及到一个定时器的实现类ScheduledTrigger

ScheduledTrigger

触发检查点的定时任务类。其实现就是调用triggerCheckpoint方法。这个方法后面会具体介绍。

public void run() {
    try {
        triggerCheckpoint(System.currentTimeMillis());
    }
    catch (Exception e) {
        LOG.error("Exception while triggering checkpoint", e);
    }
}

startCheckpointScheduler

启动触发检查点的定时任务的方法实现:

    public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            try {
                // Multiple start calls are OK
                checkpointIdCounter.start();
            } catch (Exception e) {
                String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
                throw new RuntimeException(msg, e);
            }

            periodicScheduling = true;
            currentPeriodicTrigger = new ScheduledTrigger();
            timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
        }
    }

方法的实现包含两个主要动作:

  • 启动检查点ID计数器checkpointIdCounter
  • 启动触发检查点的定时任务

stopCheckpointScheduler

关闭定时任务的方法,用来释放资源,重置一些标记变量。

triggerCheckpoint

该方法是触发一个新的检查点的核心逻辑。

首先,方法中会去判断一个flag:triggerRequestQueued。该标识表示是否一个检查点的触发请求不能被立即执行。

// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
    LOG.warn("Trying to trigger another checkpoint while one was queued already");
    return false;
}

如果不能被立即执行,则直接返回。

不能被立即执行的原因是:还有其他处理没有完成。

接着检查正在并发处理的未完成的检查点:

            // if too many checkpoints are currently in progress, we need to mark that a request is queued
            if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                triggerRequestQueued = true;
                if (currentPeriodicTrigger != null) {
                    currentPeriodicTrigger.cancel();
                    currentPeriodicTrigger = null;
                }
                return false;
            }

如果未完成的检查点过多,大于允许的并发处理的检查点数目的阈值,则将当前检查点的触发请求设置为不能立即执行,如果定时任务已经启动,则取消定时任务的执行,并返回。

以上这些检查处于基于锁机制实现的同步代码块中。

接着检查需要被触发检查点的task是否都处于运行状态:

        ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
        for (int i = 0; i < tasksToTrigger.length; i++) {
            Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee != null && ee.getState() == ExecutionState.RUNNING) {
                triggerIDs[i] = ee.getAttemptId();
            } else {
                LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
                        tasksToTrigger[i].getSimpleName());
                return false;
            }
        }

只要有一个task不满足条件,则不会触发检查点,并立即返回。

然后检查是否所有需要ack检查点的task都处于运行状态:

        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

        for (ExecutionVertex ev : tasksToWaitFor) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee != null) {
                ackTasks.put(ee.getAttemptId(), ev);
            } else {
                LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
                        ev.getSimpleName());
                return false;
            }
        }

如果有一个task不满足条件,则不会触发检查点,并立即返回。

以上条件都满足(即没有return false;),才具备触发一个检查点的基本条件。

下一步,获得checkpointId

        final long checkpointID;
        if (nextCheckpointId < 0) {
            try {
                // this must happen outside the locked scope, because it communicates
                // with external services (in HA mode) and may block for a while.
                checkpointID = checkpointIdCounter.getAndIncrement();
            }
            catch (Throwable t) {
                int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
                LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
                return false;
            }
        }
        else {
            checkpointID = nextCheckpointId;
        }

这依赖于该方法的另一个参数nextCheckpointId,如果其值为-1,则起到标识的作用,指示checkpointId将从外部获取(比如Zookeeper,后续文章会谈及检查点ID的生成机制)。

接着创建一个PendingCheckpoint对象:

final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);

该类表示一个待处理的检查点。

与此同时,会定义一个针对当前检查点超时进行资源清理的取消器canceller。该取消器主要是针对检查点没有释放资源的情况进行资源释放操作,同时还会调用triggerQueuedRequests方法启动一个触发检查点的定时任务,如果有的话(取决于triggerRequestQueued是否为true)。

然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。

检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的PendingCheckpoint对象加入集合中:

pendingCheckpoints.put(checkpointID, checkpoint);

同时会启动针对当前检查点的超时取消器:

timer.schedule(canceller, checkpointTimeout);

接下来会发送消息给task以真正触发检查点(基于消息驱动的协同机制):

for (int i = 0; i < tasksToTrigger.length; i++) {
    ExecutionAttemptID id = triggerIDs[i];
    TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
    tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}

基于Actor的消息驱动的协同机制

上面已经谈到了检查点的触发机制是基于定时任务的周期性触发,那么定时任务的启停机制又是什么?Flink使用的是基于AKKA的Actor模型的消息驱动机制。

CheckpointCoordinatorDeActivator是一个Actor的实现,它用于基于消息来驱动检查点的定时任务的启停:

    public void handleMessage(Object message) {
        if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
            JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();

            if (status == JobStatus.RUNNING) {
                // start the checkpoint scheduler
                coordinator.startCheckpointScheduler();
            } else {
                // anything else should stop the trigger for now
                coordinator.stopCheckpointScheduler();
            }
        }

        // we ignore all other messages
    }

Actor会收到Job状态的变化通知:JobStatusChanged。一旦变成RUNNING,那么检查点的定时任务会被立即启动;否则会被立即关闭。

Actor被创建的代码是CheckpointCoordinator中的createActivatorDeactivator方法:

    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            if (jobStatusListener == null) {
                Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);

                // wrap the ActorRef in a AkkaActorGateway to support message decoration
                jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
            }

            return jobStatusListener;
        }
    }

既然,是基于消息驱动机制,那么就需要各种类型的消息对应不同的业务逻辑。这些消息在Flink中被定义在package:org.apache.flink.runtime.messages.checkpoint中。

类图如下:

AbstractCheckpointMessage

检查点消息的基础抽象类,提供了三个公共属性(从构造器注入):

  • job:JobID的实例,表示当前这条消息实例的归属;
  • taskExecutionId:ExecutionAttemptID的实例,表示检查点的源/目的任务
  • checkpointId:当前消息协调的检查点ID

除此之外,该实现仅仅override了hashCodeequals方法。

TriggerCheckpoint

该消息由JobManager发送给TaskManager,用于告诉一个task触发它的检查点。

触发消息

位于CheckpointCoordinator类的triggerCheckpoint中,上面已经提及过。

for (int i = 0; i < tasksToTrigger.length; i++) {
    ExecutionAttemptID id = triggerIDs[i];
    TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
    tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}

消息处理

TaskManagerhandleCheckpointingMessage实现:

      case message: TriggerCheckpoint =>
        val taskExecutionId = message.getTaskExecutionId
        val checkpointId = message.getCheckpointId
        val timestamp = message.getTimestamp

        log.debug(s"Receiver TriggerCheckpoint [email protected]$timestamp for $taskExecutionId.")

        val task = runningTasks.get(taskExecutionId)
        if (task != null) {
          task.triggerCheckpointBarrier(checkpointId, timestamp)
        } else {
          log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
        }

主要是触发检查点屏障Barrier

DeclineCheckpoint

该消息由TaskManager发送给JobManager,用于告诉检查点协调器:检查点的请求还没有能够被处理。这种情况通常发生于:某task已处于RUNNING状态,但在内部可能还没有准备好执行检查点。

它除了AbstractCheckpointMessage需要的三个属性外,还需要用于关联检查点的timestamp

触发消息

位于Task类的triggerCheckpointBarrier方法中:

                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
                            if (!success) {
                                DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
                                jobManager.tell(decline);
                            }
                        }
                        catch (Throwable t) {
                            if (getExecutionState() == ExecutionState.RUNNING) {
                                failExternally(new RuntimeException(
                                    "Error while triggering checkpoint for " + taskName,
                                    t));
                            }
                        }
                    }
                };

消息处理

位于JobManagerhandleCheckpointMessage

具体的实现在CheckpointCoordinatorreceiveDeclineMessage中:

首先从接收的消息中(DeclineCheckpoint)获得检查点编号:

final long checkpointId = message.getCheckpointId();

接下来的逻辑是判断当前检查点是否是未完成的检查点:isPendingCheckpoint

接下来分为三种情况对待:

  • 如果是未完成的检查点,并且相关资源没有被释放(检查点没有被discarded
isPendingCheckpoint = true;
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
rememberRecentCheckpointId(checkpointId);

isPendingCheckpointtrue,根据检查点编号,将检查点从未完成的检查点集合中移除,discard检查点,记住最近的检查点(将其保持到到一个最近的检查点列表中)。

接下来查找是否还有待处理的检查点,根据检查点时间戳来判断:

boolean haveMoreRecentPending = false;
Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
while (entries.hasNext()) {
    PendingCheckpoint p = entries.next().getValue();
    if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
        haveMoreRecentPending = true;
        break;
    }
}

根据标识haveMoreRecentPending来进入不同的处理逻辑:

if (!haveMoreRecentPending && !triggerRequestQueued) {
    LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
    triggerCheckpoint(System.currentTimeMillis());
} else if (!haveMoreRecentPending) {
    LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
    triggerQueuedRequests();
}

如果有需要处理的检查点,并且当前能立即处理,则立即触发检查点定时任务;如果有需要处理的检查点,但不能立即处理,则触发入队的定时任务。

  • 如果是未完成的检查点,并且检查点已经被discarded

抛出IllegalStateException异常

  • 如果不是未完成的检查点

如果在最近未完成的检查点列表中找到,则有可能表示消息来迟了,将isPendingCheckpoint置为true,否则将isPendingCheckpoint置为false.

最后返回isPendingCheckpoint

AcknowledgeCheckpoint

该消息是一个应答信号,表示某个独立的task的检查点已经完成。也是由TaskManager发送给JobManager。该消息会携带task的状态:

  • state
  • stateSize

触发消息

RuntimeEnvironment类的acknowledgeCheckpoint方法。

消息处理

具体的实现在CheckpointCoordinatorreceiveAcknowledgeMessage中,开始的实现同receiveDeclineMessage,也是判断当前接收到的消息中包含的检查点是否是待处理的检查点。如果是,并且也没有discard掉,则执行如下逻辑:

                if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) {
                    if (checkpoint.isFullyAcknowledged()) {
                        completed = checkpoint.toCompletedCheckpoint();

                        completedCheckpointStore.addCheckpoint(completed);

                        LOG.info("Completed checkpoint " + checkpointId + " (in " +
                                completed.getDuration() + " ms)");
                        LOG.debug(completed.getStates().toString());

                        pendingCheckpoints.remove(checkpointId);
                        rememberRecentCheckpointId(checkpointId);

                        dropSubsumedCheckpoints(completed.getTimestamp());

                        onFullyAcknowledgedCheckpoint(completed);

                        triggerQueuedRequests();
                    }
                }

检查点首先应答相关的task,如果检查点已经完全应答完成,则将检查点转换成CompletedCheckpoint,然后将其加入completedCheckpointStore列表,并从pendingCheckpoints中移除。然后调用dropSubsumedCheckpoints它会从pendingCheckpointsdiacard所有时间戳小于当前检查点的时间戳,并从集合中移除。

最后,如果该检查点被转化为已完成的检查点,则:

        if (completed != null) {
            final long timestamp = completed.getTimestamp();

            for (ExecutionVertex ev : tasksToCommitTo) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ExecutionAttemptID attemptId = ee.getAttemptId();
                    NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
                    ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
                }
            }

            statsTracker.onCompletedCheckpoint(completed);
        }

迭代所有待commit的task,发送NotifyCheckpointComplete消息。同时触发状态跟踪器的onCompletedCheckpoint回调方法。

NotifyCheckpointComplete

该消息由JobManager发送给TaskManager,用于告诉一个task它的检查点已经得到完成确认,task可以向第三方提交该检查点。

触发消息

位于CheckpointCoordinator类的receiveAcknowledgeMessage方法中,当检查点acktask完成,转化为CompletedCheckpoint之后

        if (completed != null) {
            final long timestamp = completed.getTimestamp();

            for (ExecutionVertex ev : tasksToCommitTo) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ExecutionAttemptID attemptId = ee.getAttemptId();
                    NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
                    ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
                }
            }

            statsTracker.onCompletedCheckpoint(completed);
        }

消息处理

TaskManagerhandleCheckpointingMessage

实现:

      case message: NotifyCheckpointComplete =>
        val taskExecutionId = message.getTaskExecutionId
        val checkpointId = message.getCheckpointId
        val timestamp = message.getTimestamp

        log.debug(s"Receiver ConfirmCheckpoint [email protected]$timestamp for $taskExecutionId.")

        val task = runningTasks.get(taskExecutionId)
        if (task != null) {
          task.notifyCheckpointComplete(checkpointId)
        } else {
          log.debug(
            s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
        }

主要是触发tasknotifyCheckpointComplete方法。

小结

这篇文章主要讲解了检查点的基于定时任务的周期性的触发机制,以及基于Akka的Actor模型的消息驱动的协同处理机制。


微信扫码关注公众号:Apache_Flink


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

时间: 2024-10-07 22:22:28

Apache Flink fault tolerance源码剖析(二)的相关文章

Apache Flink fault tolerance源码剖析(一)

因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题.上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理.当然原理归原理,原理体现在代码实现里并不是想象中的那么直观.这里的源码剖析也是我学习以及理解的过程. 作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来

Apache Flink fault tolerance源码剖析完结篇

这篇文章是对Flinkfault tolerance的一个总结.虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及. 回顾这个系列,每篇文章都至少涉及一个知识点.我们来挨个总结一下. 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function.它们通过不同的方式来达到状态快照以及状态恢复的能力.其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口.这两个接口的行为是类似的. 当然对于数据

Apache Flink fault tolerance源码剖析(四)

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器). 这篇文章会谈论一种特殊的检查点,Flink将之命名为--Savepoint(保存点). 因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现.但作为一个特性,值得花费一个篇幅来介绍. 检查点VS保存点 使用数据流API编写的程序可以从保存点来恢复执行.保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态. 保存点是人工触发

Apache Flink fault tolerance源码剖析(三)

上一篇文章我们探讨了基于定时任务的周期性检查点触发机制以及基于Akka的actor模型的消息驱动协同机制.这篇文章我们将探讨Zookeeper在Flink的Fault Tolerance所起到的作用. 其实,Flink引入Zookeeper的目的主要是让JobManager实现高可用(leader选举). 因为Zookeeper在Flink里存在多种应用场景,本篇我们还是将重心放在Fault Tolerance上,即讲解Zookeeper在检查点的恢复机制上发挥的作用. 如果用一幅图表示快照机制

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

boost.asio源码剖析(二) ---- 架构浅析

* 架构浅析 先来看一下asio的0层的组件图.                     (图1.0) io_object是I/O对象的集合,其中包含大家所熟悉的socket.deadline_timer等对象,主要功能是提供接口给用户使用. services服务是逻辑功能的实现者,其中包含提供定时功能的deadline_timer_service.提供socket相关功能的win_iocp_socket_service(windows平台)/reactive_socket_service(其他

tomcat(12)org.apache.catalina.core.StandardContext源码剖析

[0]README 0)本文部分文字描述转自 "how tomcat works",旨在学习 "tomcat(12)StandardContext源码剖析" 的基础知识: 1)Context实例表示一个具体的web 应用程序,其中包含一个或多个Wrapper实例,每个Wrapper 表示一个具体的servlet定义: 2)Context容器还需要其他组件的支持,如载入器和Session 管理器.本章要intro 的 StandardContext是 catalina

tomcat(11)org.apache.catalina.core.StandardWrapper源码剖析

[0]README 0.0)本文部分文字描述转自 "how tomcat works",旨在学习 "tomcat(11)StandardWrapper源码剖析" 的基础知识: 0.1)StandardWrapper 是 Catalina中对Wrapper接口的标准实现:要知道,tomcat 中有4种类型的容器:Engine,Host,Context 和 Wrapper:(干货--review  tomcat 中有4种类型的容器:Engine,Host,Context

Django Rest Framework源码剖析(二)-----权限

一.简介 在上一篇博客中已经介绍了django rest framework 对于认证的源码流程,以及实现过程,当用户经过认证之后下一步就是涉及到权限的问题.比如订单的业务只能VIP才能查看,所以这时候需要对权限进行控制.下面将介绍DRF的权限控制源码剖析. 二.基本使用 这里继续使用之前的示例,加入相应的权限,这里先介绍使用示例,然后在分析权限源码 1.在django 项目下新建立目录utils,并建立permissions.py,添加权限控制: class MyPremission(obje