Flink源码阅读(一)--Checkpoint触发机制

Checkpoint触发机制

  Flink的checkpoint是通过定时器周期性触发的。checkpoint触发最关键的类是CheckpointCoordinator,称它为检查点协调器。

org.apache.flink.runtime.checkpoint.CheckpointCoordinator

  CheckpointCoordinator主要作用是协调operators和state的分布式快照。它通过向相关的tasks发送触发消息和从各tasks收集确认消息(Ack)来完成checkpoint。同时,它还收集和维护各个tasks上报的状态句柄/状态引用(state handles)。

  CheckpointCoordinator主要属性:

 1     /** Coordinator-wide lock to safeguard the checkpoint updates */
 2     private final Object lock = new Object();  //Coordinator范围的锁
 3
 4     /** Lock specially to make sure that trigger requests do not overtake each other.
 5      * This is not done with the coordinator-wide lock, because as part of triggering,
 6      * blocking operations may happen (distributed atomic counters).
 7      * Using a dedicated lock, we avoid blocking the processing of ‘acknowledge/decline‘
 8      * messages during that phase. */
 9     private final Object triggerLock = new Object(); //trigger requests的专用锁,避免在获取checkpointID时阻塞对消息的处理。
10
11     /** Tasks who need to be sent a message when a checkpoint is started */
12     private final ExecutionVertex[] tasksToTrigger;
13
14     /** Tasks who need to acknowledge a checkpoint before it succeeds */
15     private final ExecutionVertex[] tasksToWaitFor;
16
17     /** Tasks who need to be sent a message when a checkpoint is confirmed */
18     private final ExecutionVertex[] tasksToCommitTo;
19
20     /** Map from checkpoint ID to the pending checkpoint */
21     private final Map<Long, PendingCheckpoint> pendingCheckpoints;//待处理的checkpoint
22
23     /** Actor that receives status updates from the execution graph this coordinator works for */
24     private JobStatusListener jobStatusListener;//Actor实例,监听Job状态变化并根据变化启停定时任务
25
26     /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
27      * accessed in synchronized scope */
28     private boolean triggerRequestQueued;//标记一个触发请求是否不能被立即处理。
29
30     /** Flag marking the coordinator as shut down (not accepting any messages any more) */
31     private volatile boolean shutdown;//coordinator的关闭标志

  ScheduledTrigger

  ScheduledTrigger是检查点定时任务类,主要是调用了triggerCheckpoint方法。

 1     private final class ScheduledTrigger implements Runnable {
 2         @Override
 3         public void run() {
 4             try {
 5                 triggerCheckpoint(System.currentTimeMillis(), true);
 6             }
 7             catch (Exception e) {
 8                 LOG.error("Exception while triggering checkpoint.", e);
 9             }
10         }
11     }

  下面具体看一下 triggerCheckpoint 方法的具体实现

1     //触发一个新的标准检查点。timestamp为触发检查点的时间戳,isPeriodic标志是否是周期性的触发
2     public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
3         return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
4     }

  触发检查点的核心逻辑:

    首先进行触发Checkpoint之前的预检查,判断是否满足条件;

    然后获取一个CheckpointID,创建PendingCheckpoint实例;

    之后重新检查触发条件是否满足要求,防止产生竞态条件;

    最后将PendingCheckpoint实例checkpoint加入到pendingCheckpoints中,并向tasks发送消息触发它们的检查点。

  1     CheckpointTriggerResult triggerCheckpoint(
  2             long timestamp,
  3             CheckpointProperties props,
  4             String targetDirectory,
  5             boolean isPeriodic) {
  6
  7         // Sanity check 如果检查点是存储在外部系统中且targetDirectory为空,报错
  8         if (props.externalizeCheckpoint() && targetDirectory == null) {
  9             throw new IllegalStateException("No target directory specified to persist checkpoint to.");
 10         }
 11
 12         // make some eager pre-checks 一些checkpoint之前的预检查
 13         synchronized (lock) {
 14             // abort if the coordinator has been shutdown in the meantime
 15             if (shutdown) {
 16                 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
 17             }
 18
 19             // Don‘t allow periodic checkpoint if scheduling has been disabled
 20             if (isPeriodic && !periodicScheduling) {
 21                 return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
 22             }
 23
 24             // validate whether the checkpoint can be triggered, with respect to the limit of
 25             // concurrent checkpoints, and the minimum time between checkpoints.
 26             // these checks are not relevant for savepoints
 27             // 验证checkpoint是否能被触发,关于并发检查点的限制和检查点之间的最小时间。
 28             // 判断checkpoint是否被强制。强制checkpoint不受并发检查点最大数量和检查点之间最小时间的限制。
 29             if (!props.forceCheckpoint()) {
 30                 // sanity check: there should never be more than one trigger request queued
 31                 if (triggerRequestQueued) {
 32                     //如果不能被立即触发,直接返回异常
 33                     LOG.warn("Trying to trigger another checkpoint while one was queued already");
 34                     return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 35                 }
 36
 37                 // if too many checkpoints are currently in progress, we need to mark that a request is queued
 38                 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 39                     //如果未完成的检查点太多,大于配置的并发检查点最大数量,则将当前检查点的触发请求设置为不能立即执行。
 40                     triggerRequestQueued = true;
 41                     //如果定时任务已经启动,则取消定时任务的执行。
 42                     if (currentPeriodicTrigger != null) {
 43                         currentPeriodicTrigger.cancel(false);
 44                         currentPeriodicTrigger = null;
 45                     }
 46                     return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
 47                 }
 48
 49                 // make sure the minimum interval between checkpoints has passed
 50                 //检查是否满足checkpoint之间的最小时间间隔的条件
 51                 final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
 52                 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
 53
 54                 if (durationTillNextMillis > 0) {
 55                     if (currentPeriodicTrigger != null) {
 56                         currentPeriodicTrigger.cancel(false);
 57                         currentPeriodicTrigger = null;
 58                     }
 59                     // Reassign the new trigger to the currentPeriodicTrigger
 60                     //此时延迟时间设置为durationTillNextMillis
 61                     currentPeriodicTrigger = timer.scheduleAtFixedRate(
 62                             new ScheduledTrigger(),
 63                             durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
 64
 65                     return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 66                 }
 67             }
 68         }
 69
 70         // check if all tasks that we need to trigger are running.
 71         // if not, abort the checkpoint
 72         // 检查需要触发checkpoint的所有Tasks是否处于运行状态,如果有一个不满足条件,则不触发检查点
 73         Execution[] executions = new Execution[tasksToTrigger.length];
 74         for (int i = 0; i < tasksToTrigger.length; i++) {
 75             Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
 76             if (ee != null && ee.getState() == ExecutionState.RUNNING) {
 77                 executions[i] = ee;
 78             } else {
 79                 LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
 80                         tasksToTrigger[i].getTaskNameWithSubtaskIndex());
 81                 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 82             }
 83         }
 84
 85         // next, check if all tasks that need to acknowledge the checkpoint are running.
 86         // if not, abort the checkpoint
 87         //检查所有需要ack的tasks是否都处于运行状态,如果有一个不满足条件,则不触发检查点。
 88         Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
 89
 90         for (ExecutionVertex ev : tasksToWaitFor) {
 91             Execution ee = ev.getCurrentExecutionAttempt();
 92             if (ee != null) {
 93                 ackTasks.put(ee.getAttemptId(), ev);
 94             } else {
 95                 LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
 96                         ev.getTaskNameWithSubtaskIndex());
 97                 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 98             }
 99         }
100
101         // we will actually trigger this checkpoint!
102
103         // we lock with a special lock to make sure that trigger requests do not overtake each other.
104         // this is not done with the coordinator-wide lock, because the ‘checkpointIdCounter‘
105         // may issue blocking operations. Using a different lock than the coordinator-wide lock,
106         // we avoid blocking the processing of ‘acknowledge/decline‘ messages during that time.
107         // 触发检查点,在triggerLock同步代码块中完成,而不是使用coordinator范围的锁。
108         synchronized (triggerLock) {
109             final long checkpointID;
110             //首先获取checkpointID
111             try {
112                 // this must happen outside the coordinator-wide lock, because it communicates
113                 // with external services (in HA mode) and may block for a while.
114                 checkpointID = checkpointIdCounter.getAndIncrement();
115             }
116             catch (Throwable t) {
117                 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
118                 LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
119                 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
120             }
121
122             //创建PendingCheckpoint实例,表示待处理检查点
123             final PendingCheckpoint checkpoint = new PendingCheckpoint(
124                 job,
125                 checkpointID,
126                 timestamp,
127                 ackTasks,
128                 props,
129                 targetDirectory,
130                 executor);
131
132             if (statsTracker != null) {
133                 PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
134                     checkpointID,
135                     timestamp,
136                     props);
137
138                 checkpoint.setStatsCallback(callback);
139             }
140
141             // schedule the timer that will clean up the expired checkpoints
142             // 针对当前checkpoints超时进行资源清理的canceller
143             final Runnable canceller = new Runnable() {
144                 @Override
145                 public void run() {
146                     synchronized (lock) {
147                         // only do the work if the checkpoint is not discarded anyways
148                         // note that checkpoint completion discards the pending checkpoint object
149                         if (!checkpoint.isDiscarded()) {
150                             LOG.info("Checkpoint " + checkpointID + " expired before completing.");
151
152                             checkpoint.abortExpired();
153                             pendingCheckpoints.remove(checkpointID);
154                             rememberRecentCheckpointId(checkpointID);
155
156                             triggerQueuedRequests();
157                         }
158                     }
159                 }
160             };
161
162             try {
163                 //重新请求coordinator-wide lock
164                 // re-acquire the coordinator-wide lock
165                 synchronized (lock) {
166                     // since we released the lock in the meantime, we need to re-check
167                     // that the conditions still hold.
168                     // 重新检查触发条件,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。
169                     if (shutdown) {
170                         return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
171                     }
172                     else if (!props.forceCheckpoint()) {
173                         if (triggerRequestQueued) {
174                             LOG.warn("Trying to trigger another checkpoint while one was queued already");
175                             return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
176                         }
177
178                         if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
179                             triggerRequestQueued = true;
180                             if (currentPeriodicTrigger != null) {
181                                 currentPeriodicTrigger.cancel(false);
182                                 currentPeriodicTrigger = null;
183                             }
184                             return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
185                         }
186
187                         // make sure the minimum interval between checkpoints has passed
188                         final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
189                         final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
190
191                         if (durationTillNextMillis > 0) {
192                             if (currentPeriodicTrigger != null) {
193                                 currentPeriodicTrigger.cancel(false);
194                                 currentPeriodicTrigger = null;
195                             }
196
197                             // Reassign the new trigger to the currentPeriodicTrigger
198                             currentPeriodicTrigger = timer.scheduleAtFixedRate(
199                                     new ScheduledTrigger(),
200                                     durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
201
202                             return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
203                         }
204                     }
205
206                     LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
207
208                     //将checkpoint加入到pendingCheckpoints中
209                     pendingCheckpoints.put(checkpointID, checkpoint);
210
211                     //启动超时canceller,延迟checkpointTimeout执行
212                     ScheduledFuture<?> cancellerHandle = timer.schedule(
213                             canceller,
214                             checkpointTimeout, TimeUnit.MILLISECONDS);
215
216                     if (!checkpoint.setCancellerHandle(cancellerHandle)) {
217                         // checkpoint is already disposed!
218                         cancellerHandle.cancel(false);
219                     }
220
221                     // trigger the master hooks for the checkpoint
222                     final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
223                             checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
224                     for (MasterState s : masterStates) {
225                         checkpoint.addMasterState(s);
226                     }
227                 }
228                 // end of lock scope
229
230                 CheckpointOptions checkpointOptions;
231                 if (!props.isSavepoint()) {
232                     checkpointOptions = CheckpointOptions.forFullCheckpoint();
233                 } else {
234                     checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
235                 }
236
237                 // send the messages to the tasks that trigger their checkpoint
238                 // 向tasks发送消息,触发它们的检查点
239                 for (Execution execution: executions) {
240                     execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
241                 }
242
243                 numUnsuccessfulCheckpointsTriggers.set(0);
244                 return new CheckpointTriggerResult(checkpoint);
245             }
246             catch (Throwable t) {
247                 // guard the map against concurrent modifications
248                 synchronized (lock) {
249                     pendingCheckpoints.remove(checkpointID);
250                 }
251
252                 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
253                 LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
254                         checkpointID, numUnsuccessful, t);
255
256                 if (!checkpoint.isDiscarded()) {
257                     checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
258                 }
259                 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
260             }
261
262         } // end trigger lock
263     }

  启动定时任务方法:startCheckpointScheduler

 1     public void startCheckpointScheduler() {
 2         synchronized (lock) {
 3             if (shutdown) {
 4                 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 5             }
 6             //保证所有以前的timer被取消
 7             stopCheckpointScheduler();
 8
 9             periodicScheduling = true;
10             //scheduleAtFixedRate方法是以固定延迟和固定时间间隔周期性的执行任务
11             currentPeriodicTrigger = timer.scheduleAtFixedRate(
12                     new ScheduledTrigger(),
13                     baseInterval, baseInterval, TimeUnit.MILLISECONDS);
14         }
15     }

  停止定时任务方法:stopCheckpointScheduler

 1     //重置一些标记变量,释放资源
 2     public void stopCheckpointScheduler() {
 3         synchronized (lock) {
 4             triggerRequestQueued = false;
 5             periodicScheduling = false;
 6
 7             if (currentPeriodicTrigger != null) {
 8                 currentPeriodicTrigger.cancel(false);//取消当前周期的触发任务
 9                 currentPeriodicTrigger = null;
10             }
11
12             //pendingCheckpoints中存的是待执行的检查点
13             for (PendingCheckpoint p : pendingCheckpoints.values()) {
14                 p.abortError(new Exception("Checkpoint Coordinator is suspending."));
15             }
16             pendingCheckpoints.clear();//清空pendingCheckpoints
17             numUnsuccessfulCheckpointsTriggers.set(0);
18         }
19     }

基于Actor的消息驱动的协同机制

  启动和停止定时任务的机制是怎样的?Flink使用的是基于AKKA的Actor模型的消息驱动机制。

  CheckpointCoordinatorDeActivator类

org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator

  CheckpointCoordinatorDeActivator是actor的实现类,监听JobStatus的变化,启动和停止周期性的checkpoint调度任务。

 1     //actor的实现类,监听JobStatus的变化,激活和取消周期性的checkpoint调度任务。
 2     public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 3
 4         private final CheckpointCoordinator coordinator;
 5
 6         public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
 7             this.coordinator = checkNotNull(coordinator);
 8         }
 9
10         @Override
11         public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
12             if (newJobStatus == JobStatus.RUNNING) {
13                 // start the checkpoint scheduler
14                 // 一旦监听到JobStatus变为RUNNING,就会启动定时任务
15                 coordinator.startCheckpointScheduler();
16             } else {
17                 // anything else should stop the trigger for now
18                 coordinator.stopCheckpointScheduler();
19             }
20         }
21     }

  CheckpointCoordinatorDeActivator的实例是在CheckpointCoordinator中被创建的,方法为createActivatorDeactivator

 1     public JobStatusListener createActivatorDeactivator() {
 2         synchronized (lock) {
 3             if (shutdown) {
 4                 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 5             }
 6
 7             if (jobStatusListener == null) {
 8                 jobStatusListener = new CheckpointCoordinatorDeActivator(this);
 9             }
10
11             return jobStatusListener;
12         }
13     }

  checkpoint相关Akka消息

  AbstractCheckpointMessage :所有checkpoint消息的基础抽象类

org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage

  AbstractCheckpointMessage主要属性:

1     /** The job to which this message belongs */
2     private final JobID job;
3     /** The task execution that is source/target of the checkpoint message */
4     private final ExecutionAttemptID taskExecutionId; //检查点的source/target task
5     /** The ID of the checkpoint that this message coordinates */
6     private final long checkpointId;

  它有以下实现类:

    TriggerCheckpoint :JobManager向TaskManager发送的检查点触发消息;

    AcknowledgeCheckpoint :TaskManager向JobManager发送的某个独立task的检查点完成确认的消息;

    DeclineCheckpoint :TaskManager向JobManager发送的检查点还没有被处理的消息;

    NotifyCheckpointComplete :JobManager向TaskManager发送的检查点完成的消息。

  TriggerCheckpoint消息

    从JobManager发送到TaskManager,通知指定的task触发checkpoint。

    发送消息

      发送消息的逻辑是在CheckpointCoordinator中,上文提到过:

1         // send the messages to the tasks that trigger their checkpoint
2         for (Execution execution: executions) {
3             execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
4         }

      其中executions是Execution[]数组,其中存储的元素是在检查点触发时需要被发送消息的Tasks的集合(即CheckpointCoordinator成员变量tasksToTrigger中的数据)。对每一个要发送的Task执行triggerCheckpoint()方法。

      接下来,看一下Execution的triggerCheckpoint方法。

 1         //在该execution的task上触发一个新的checkpoint
 2         public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 3             //获取Resource
 4             final SimpleSlot slot = assignedResource;//获取Slot
 5
 6             if (slot != null) {
 7                 //TaskManagerGateway是用于和TaskManager通信的抽象基础类
 8                 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 9                 //通过taskManagerGateway向TaskManager发送消息
10                 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
11             } else {
12                 LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
13                     "no longer running.");
14             }
15         }

      继续进入ActorTaskManagerGateway(TaskManagerGateway抽象类的Actor实现)类的triggerCheckpoint()方法:

 1         public void triggerCheckpoint(
 2                 ExecutionAttemptID executionAttemptID,
 3                 JobID jobId,
 4                 long checkpointId,
 5                 long timestamp,
 6                 CheckpointOptions checkpointOptions) {
 7
 8             Preconditions.checkNotNull(executionAttemptID);
 9             Preconditions.checkNotNull(jobId);
10             //新建了一个TriggerCheckpoint消息,通过actorGateway的tell方法(异步发送,没有返回结果)发送这个消息
11             //ActorGateway是基于actor通信的接口
12             actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
13         }

      AkkaActorGateway类是ActorGateway接口一种实现,它使用 Akka 与远程的actors进行通信。看一下AkkaActorGateway的tell方法:

1         @Override
2         public void tell(Object message) {
3             Object newMessage = decorator.decorate(message);
4             //通过ActorRef实例actor发送消息,ActorRef是akka中的类。以后需要研究Akka的实现机制。
5             actor.tell(newMessage, ActorRef.noSender());
6         }

      至此,发送TriggerCheckpoint消息的过程结束。下面将看一下TaskManager接收消息的过程。

    接收消息

      TaskManager接收消息的部分是用scala实现的。

org.apache.flink.runtime.taskmanager.TaskManager

      TaskManager类的handleMessage方法是消息处理中心。

 1         //该方法为TaskManager的消息处理中心。接收消息,按消息的种类调用不同的方法处理。
 2         override def handleMessage: Receive = {
 3             case message: TaskMessage => handleTaskMessage(message)
 4
 5             //这个就是处理checkpoints相关的消息
 6             case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
 7
 8             case JobManagerLeaderAddress(address, newLeaderSessionID) =>
 9               handleJobManagerLeaderAddress(address, newLeaderSessionID)
10
11             case message: RegistrationMessage => handleRegistrationMessage(message)
12
13             ...
14         }

      接下来,看方法handleCheckpointingMessage(),主要是触发Checkpoint Barrier。

 1         //处理Checkpoint相关的消息
 2         private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
 3
 4             actorMessage match {
 5               //触发Checkpoint消息
 6               case message: TriggerCheckpoint =>
 7                 val taskExecutionId = message.getTaskExecutionId
 8                 val checkpointId = message.getCheckpointId
 9                 val timestamp = message.getTimestamp
10                 val checkpointOptions = message.getCheckpointOptions
11
12                 log.debug(s"Receiver TriggerCheckpoint [email protected]$timestamp for $taskExecutionId.")
13
14                 val task = runningTasks.get(taskExecutionId)
15                 if (task != null) {
16                   //调用Task的triggerCheckpointBarrier方法,触发Checkpoint Barrier,Barrier实现机制的细节以后讨论。
17                   task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
18                 } else {
19                   log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
20                 }
21               //Checkpoint完成通知消息
22               case message: NotifyCheckpointComplete =>
23                 val taskExecutionId = message.getTaskExecutionId
24                 val checkpointId = message.getCheckpointId
25                 val timestamp = message.getTimestamp
26
27                 log.debug(s"Receiver ConfirmCheckpoint [email protected]$timestamp for $taskExecutionId.")
28
29                 val task = runningTasks.get(taskExecutionId)
30                 if (task != null) {
31                   //调用Task的notifyCheckpointComplete方法,进行相关处理
32                   task.notifyCheckpointComplete(checkpointId)
33                 } else {
34                   log.debug(
35                     s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
36                 }
37
38               // unknown checkpoint message
39               case _ => unhandled(actorMessage)
40             }
41           }

  NotifyCheckpointComplete消息

    JobManager发送到TaskManager,通知task它的检查点已经得到完成确认,task可以向第三方提交checkpoint。

    发送消息

      发送NotifyCheckpointComplete消息的部分在CheckpointCoordinator类的receiveAcknowledgeMessage方法中。

 1         //该方法接收一个AcknowledgeCheckpoint消息,返回该Message是否与一个pending checkpoint相关联
 2         public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 3             if (shutdown || message == null) {
 4                 return false;
 5             }
 6             if (!job.equals(message.getJob())) {
 7                 LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
 8                 return false;
 9             }
10
11             final long checkpointId = message.getCheckpointId();
12
13             synchronized (lock) {
14                 // we need to check inside the lock for being shutdown as well, otherwise we
15                 // get races and invalid error log messages
16                 if (shutdown) {
17                     return false;
18                 }
19
20                 final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
21
22                 //如果是待处理的检查点并且没有被Discarded
23                 if (checkpoint != null && !checkpoint.isDiscarded()) {
24
25                     //根据TaskExecutionId和SubtaskState,Acknowledges the task。确认该任务
26                     switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
27                         //确认成功
28                         case SUCCESS:
29                             LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
30                                 checkpointId, message.getTaskExecutionId(), message.getJob());
31                             //如果收到了全部task的确认消息(即notYetAcknowledgedTasks为空)
32                             if (checkpoint.isFullyAcknowledged()) {
33                                 //尝试完成PendingCheckpoint(Try to complete the given pending checkpoint)
34                                 //将完成的checkpointId从checkpoint中删除和一下标志修改,最后,发送notify complete消息
35                                 completePendingCheckpoint(checkpoint);
36                             }
37                             break;
38                         //重复消息
39                         case DUPLICATE:
40                             LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
41                                 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
42                             break;
43                         //未知消息
44                         case UNKNOWN:
45                             LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
46                                     "because the task‘s execution attempt id was unknown. Discarding " +
47                                     "the state handle to avoid lingering state.", message.getCheckpointId(),
48                                 message.getTaskExecutionId(), message.getJob());
49
50                             discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
51                             break;
52                         //废弃消息
53                         case DISCARDED:
54                             LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
55                                 "because the pending checkpoint had been discarded. Discarding the " +
56                                     "state handle tp avoid lingering state.",
57                                 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
58                             discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
59                     }
60
61                     return true;
62                 }
63                 else if (checkpoint != null) {
64                     // this should not happen
65                     throw new IllegalStateException(
66                             "Received message for discarded but non-removed checkpoint " + checkpointId);
67                 }
68                 else {
69                     boolean wasPendingCheckpoint;
70
71                     // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
72                     if (recentPendingCheckpoints.contains(checkpointId)) {
73                         wasPendingCheckpoint = true;
74                         LOG.warn("Received late message for now expired checkpoint attempt {} from " +
75                             "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
76                     }
77                     else {
78                         LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
79                             checkpointId, message.getTaskExecutionId(), message.getJob());
80                         wasPendingCheckpoint = false;
81                     }
82
83                     // try to discard the state so that we don‘t have lingering state lying around
84                     discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
85
86                     return wasPendingCheckpoint;
87                 }
88             }
89         }

      completePendingCheckpoint方法中发送NotifyCheckpointComplete消息的代码如下:

1         for (ExecutionVertex ev : tasksToCommitTo) {
2             Execution ee = ev.getCurrentExecutionAttempt();
3             if (ee != null) {
4                 ee.notifyCheckpointComplete(checkpointId, timestamp);
5             }
6         }

    接收消息

      在TriggerCheckpoint消息接收中的有这部分代码,主要是调用notifyCheckpointComplete方法: task.notifyCheckpointComplete(checkpointId)。

  AcknowledgeCheckpoint消息

    由TaskManager发向JobManager,告知JobManager指定task的checkpoint已完成。该消息可能携带task的状态和checkpointMetrics。

    AcknowledgeCheckpoint消息类的两个属性:

	private final SubtaskState subtaskState;//任务状态
	private final CheckpointMetrics checkpointMetrics;

    发送消息

      发送消息的过程在RuntimeEnvironment类中的acknowledgeCheckpoint方法

1         public void acknowledgeCheckpoint(
2                 long checkpointId,
3                 CheckpointMetrics checkpointMetrics,
4                 SubtaskState checkpointStateHandles) {
5             //通过CheckpointResponder接口的实例checkpointResponder发送ack消息
6             checkpointResponder.acknowledgeCheckpoint(
7                     jobId, executionId, checkpointId, checkpointMetrics,
8                     checkpointStateHandles);
9         }

      CheckpointResponder接口是checkpoint acknowledge and decline messages 的应答类。ActorGatewayCheckpointResponder是使用了ActorGateway的CheckpointResponder接口的实现类,包含acknowledgeCheckpoint和declineCheckpoint两个方法。

 1         @Override
 2         public void acknowledgeCheckpoint(
 3                 JobID jobID,
 4                 ExecutionAttemptID executionAttemptID,
 5                 long checkpointId,
 6                 CheckpointMetrics checkpointMetrics,
 7                 SubtaskState checkpointStateHandles) {
 8             //新建一个AcknowledgeCheckpoint消息
 9             AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
10                     jobID, executionAttemptID, checkpointId, checkpointMetrics,
11                     checkpointStateHandles);
12             //通过actorGateway发送出去
13             actorGateway.tell(message);
14         }

    接收消息

      通过receiveAcknowledgeMessage方法接收(和NotifyCheckpointComplete消息的发送过程在同一个方法)。

  DeclineCheckpoint消息

    该消息由TaskManager发送给JobManager,用于告知CheckpointCoordinator:检查点的请求还没有能够被处理。这种情况通常发生于:某task已处于RUNNING状态,但在内部可能还没有准备好执行检查点。

    发送消息

      位于task类的triggerCheckpointBarrier方法中。

org.apache.flink.runtime.taskmanager.Task
1         try {
2             boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
3             if (!success) {
4                 //通过CheckpointResponder发送消息,类似发送AcknowledgeCheckpoint消息
5                 checkpointResponder.declineCheckpoint(
6                         getJobID(), getExecutionId(), checkpointID,
7                         new CheckpointDeclineTaskNotReadyException(taskName));
8             }
9         }

    接收消息

      CheckpointCoordinator中的receiveDeclineMessage方法。

 1         public void receiveDeclineMessage(DeclineCheckpoint message) {
 2             if (shutdown || message == null) {
 3                 return;
 4             }
 5             if (!job.equals(message.getJob())) {
 6                 throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
 7                     message.getJob() + " while this coordinator handles job " + job);
 8             }
 9
10             final long checkpointId = message.getCheckpointId();
11             final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
12
13             PendingCheckpoint checkpoint;
14
15             synchronized (lock) {
16                 // we need to check inside the lock for being shutdown as well, otherwise we
17                 // get races and invalid error log messages
18                 if (shutdown) {
19                     return;
20                 }
21
22                 checkpoint = pendingCheckpoints.get(checkpointId);
23
24                 if (checkpoint != null && !checkpoint.isDiscarded()) {
25                     //如果是待处理的Checkpoint且没有被遗弃
26                     LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
27                             checkpointId, message.getTaskExecutionId(), reason);
28
29                     pendingCheckpoints.remove(checkpointId);//将checkpointId从pendingCheckpoints中删除
30                     checkpoint.abortDeclined();
31                     rememberRecentCheckpointId(checkpointId);
32
33                     // we don‘t have to schedule another "dissolving" checkpoint any more because the
34                     // cancellation barriers take care of breaking downstream alignments
35                     // we only need to make sure that suspended queued requests are resumed
36
37                     //是否还有更多pending 的checkpoint
38                     boolean haveMoreRecentPending = false;
39                     for (PendingCheckpoint p : pendingCheckpoints.values()) {
40                         if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
41                             haveMoreRecentPending = true;
42                             break;
43                         }
44                     }
45                     //
46                     if (!haveMoreRecentPending) {
47                         triggerQueuedRequests();
48                     }
49                 }
50                 else if (checkpoint != null) {
51                     // this should not happen
52                     throw new IllegalStateException(
53                             "Received message for discarded but non-removed checkpoint " + checkpointId);
54                 }
55                 else if (LOG.isDebugEnabled()) {
56                     if (recentPendingCheckpoints.contains(checkpointId)) {
57                         // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
58                         LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
59                                 checkpointId, reason);
60                     } else {
61                         // message is for an unknown checkpoint. might be so old that we don‘t even remember it any more
62                         LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
63                                 checkpointId, reason);
64                     }
65                 }
66             }
67         }

原文地址:https://www.cnblogs.com/zaizhoumo/p/9236491.html

时间: 2024-11-02 18:42:03

Flink源码阅读(一)--Checkpoint触发机制的相关文章

Redis源码阅读(一)事件机制

Redis源码阅读(一)事件机制 Redis作为一款NoSQL非关系内存数据库,具有很高的读写性能,且原生支持的数据类型丰富,被广泛的作为缓存.分布式数据库.消息队列等应用.此外Redis还有许多高可用特性,包括数据持久化,主从模式备份等等,可以满足对数据完整有一定要求的场景. 而且Redis的源码结构简单清晰,有大量材料可以参阅:通过阅读Redis源码,掌握一些常用技术在Redis中的实现,相信会对个人编程水平有很大帮助.这里记录下我阅读Redis源码的心得.从我自己比较关心的几个技术点出发,

Flink 源码解析 —— 深度解析 Flink 序列化机制

Flink 序列化机制 https://t.zsxq.com/JaQfeMf 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink 从0到1学习 -- Da

[SpringBoot]源码分析SpringBoot的异常处理机制

微信号:GitShare微信公众号:爱折腾的稻草如有问题或建议,请在公众号留言[1] 前续 为帮助广大SpringBoot用户达到"知其然,更需知其所以然"的境界,作者将通过SpringBoot系列文章全方位对SpringBoot2.0.0.RELEASE版本深入分解剖析,让您深刻的理解其内部工作原理. 正文 在SpringBoot启动时,会查找并加载所有可用的SpringBootExceptionReporter,其源码如下: //7 使用SpringFactoriesLoader在

Redis源码阅读(二)高可用设计——复制

Redis源码阅读(二)高可用设计-复制 复制的概念:Redis的复制简单理解就是一个Redis服务器从另一台Redis服务器复制所有的Redis数据库数据,能保持两台Redis服务器的数据库数据一致. 使用场景:复制机制很实用,在客户端并发访问量很大,单台Redis扛不住的情况下,可以部署多台Redis复制相同的数据,共同对外提供服务,提高Redis并发访问处理能力.当然这种通过复制方式部署多台Redis以提高并发处理能力的方式只适用于客户端大部分访问为读数据请求的场景.此外,Redis从2.

AFNetworking 3.0源码阅读 - AFURLSessionManager

这次来说一下AFURLSessionManager 从头文件的英文注释可以看出AFURLSessionManager类创建并管理着NSURLSession对象,而NSURLSession又是基于NSURLSessionConfiguration的.同时该类也是AFHTTPSessionManager的父类,下一篇来讲. AFURLSessionManager实现了四个协议 1.NSURLSessionDelegate URLSession:didBecomeInvalidWithError: U

Flink 源码解析 —— 如何获取 ExecutionGraph ?

https://t.zsxq.com/UnA2jIi 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink 从0到1学习 -- Data Sink 介绍 7

Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

JobManager 处理 SubmitJob https://t.zsxq.com/3JQJMzZ 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink

Flink 源码解析 —— 项目结构一览

Flink 源码项目结构一览 https://t.zsxq.com/MNfAYne 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink 从0到1学习 --

JDK部分源码阅读与理解

本文为博主原创,允许转载,但请声明原文地址:http://www.coselding.cn/article/2016/05/31/JDK部分源码阅读与理解/ 不喜欢重复造轮子,不喜欢贴各种东西.JDK代码什么的,让整篇文章很乱...JDK源码谁都有,没什么好贴的...如果你没看过JDK源码,建议打开Eclipse边看源码边看这篇文章,看过的可以把这篇文章当成是知识点备忘录... JDK容器类中有大量的空指针.数组越界.状态异常等异常处理,这些不是重点,我们关注的应该是它的一些底层的具体实现,这篇