Checkpoint触发机制
Flink的checkpoint是通过定时器周期性触发的。checkpoint触发最关键的类是CheckpointCoordinator,称它为检查点协调器。
代码语言:javascript复制org.apache.flink.runtime.checkpoint.CheckpointCoordinator
CheckpointCoordinator主要作用是协调operators和state的分布式快照。它通过向相关的tasks发送触发消息和从各tasks收集确认消息(Ack)来完成checkpoint。同时,它还收集和维护各个tasks上报的状态句柄/状态引用(state handles)。
CheckpointCoordinator主要属性:
代码语言:javascript复制 1 /** Coordinator-wide lock to safeguard the checkpoint updates */
2 private final Object lock = new Object(); //Coordinator范围的锁
3
4 /** Lock specially to make sure that trigger requests do not overtake each other.
5 * This is not done with the coordinator-wide lock, because as part of triggering,
6 * blocking operations may happen (distributed atomic counters).
7 * Using a dedicated lock, we avoid blocking the processing of 'acknowledge/decline'
8 * messages during that phase. */
9 private final Object triggerLock = new Object(); //trigger requests的专用锁,避免在获取checkpointID时阻塞对消息的处理。
10
11 /** Tasks who need to be sent a message when a checkpoint is started */
12 private final ExecutionVertex[] tasksToTrigger;
13
14 /** Tasks who need to acknowledge a checkpoint before it succeeds */
15 private final ExecutionVertex[] tasksToWaitFor;
16
17 /** Tasks who need to be sent a message when a checkpoint is confirmed */
18 private final ExecutionVertex[] tasksToCommitTo;
19
20 /** Map from checkpoint ID to the pending checkpoint */
21 private final Map<Long, PendingCheckpoint> pendingCheckpoints;//待处理的checkpoint
22
23 /** Actor that receives status updates from the execution graph this coordinator works for */
24 private JobStatusListener jobStatusListener;//Actor实例,监听Job状态变化并根据变化启停定时任务
25
26 /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
27 * accessed in synchronized scope */
28 private boolean triggerRequestQueued;//标记一个触发请求是否不能被立即处理。
29
30 /** Flag marking the coordinator as shut down (not accepting any messages any more) */
31 private volatile boolean shutdown;//coordinator的关闭标志
ScheduledTrigger
ScheduledTrigger是检查点定时任务类,主要是调用了triggerCheckpoint方法。
代码语言:javascript复制 1 private final class ScheduledTrigger implements Runnable {
2 @Override
3 public void run() {
4 try {
5 triggerCheckpoint(System.currentTimeMillis(), true);
6 }
7 catch (Exception e) {
8 LOG.error("Exception while triggering checkpoint.", e);
9 }
10 }
11 }
下面具体看一下 triggerCheckpoint 方法的具体实现
代码语言:javascript复制1 //触发一个新的标准检查点。timestamp为触发检查点的时间戳,isPeriodic标志是否是周期性的触发
2 public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
3 return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
4 }
触发检查点的核心逻辑:
首先进行触发Checkpoint之前的预检查,判断是否满足条件;
然后获取一个CheckpointID,创建PendingCheckpoint实例;
之后重新检查触发条件是否满足要求,防止产生竞态条件;
最后将PendingCheckpoint实例checkpoint加入到pendingCheckpoints中,并向tasks发送消息触发它们的检查点。
代码语言:javascript复制 1 CheckpointTriggerResult triggerCheckpoint(
2 long timestamp,
3 CheckpointProperties props,
4 String targetDirectory,
5 boolean isPeriodic) {
6
7 // Sanity check 如果检查点是存储在外部系统中且targetDirectory为空,报错
8 if (props.externalizeCheckpoint() && targetDirectory == null) {
9 throw new IllegalStateException("No target directory specified to persist checkpoint to.");
10 }
11
12 // make some eager pre-checks 一些checkpoint之前的预检查
13 synchronized (lock) {
14 // abort if the coordinator has been shutdown in the meantime
15 if (shutdown) {
16 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
17 }
18
19 // Don't allow periodic checkpoint if scheduling has been disabled
20 if (isPeriodic && !periodicScheduling) {
21 return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
22 }
23
24 // validate whether the checkpoint can be triggered, with respect to the limit of
25 // concurrent checkpoints, and the minimum time between checkpoints.
26 // these checks are not relevant for savepoints
27 // 验证checkpoint是否能被触发,关于并发检查点的限制和检查点之间的最小时间。
28 // 判断checkpoint是否被强制。强制checkpoint不受并发检查点最大数量和检查点之间最小时间的限制。
29 if (!props.forceCheckpoint()) {
30 // sanity check: there should never be more than one trigger request queued
31 if (triggerRequestQueued) {
32 //如果不能被立即触发,直接返回异常
33 LOG.warn("Trying to trigger another checkpoint while one was queued already");
34 return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
35 }
36
37 // if too many checkpoints are currently in progress, we need to mark that a request is queued
38 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
39 //如果未完成的检查点太多,大于配置的并发检查点最大数量,则将当前检查点的触发请求设置为不能立即执行。
40 triggerRequestQueued = true;
41 //如果定时任务已经启动,则取消定时任务的执行。
42 if (currentPeriodicTrigger != null) {
43 currentPeriodicTrigger.cancel(false);
44 currentPeriodicTrigger = null;
45 }
46 return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
47 }
48
49 // make sure the minimum interval between checkpoints has passed
50 //检查是否满足checkpoint之间的最小时间间隔的条件
51 final long earliestNext = lastCheckpointCompletionNanos minPauseBetweenCheckpointsNanos;
52 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
53
54 if (durationTillNextMillis > 0) {
55 if (currentPeriodicTrigger != null) {
56 currentPeriodicTrigger.cancel(false);
57 currentPeriodicTrigger = null;
58 }
59 // Reassign the new trigger to the currentPeriodicTrigger
60 //此时延迟时间设置为durationTillNextMillis
61 currentPeriodicTrigger = timer.scheduleAtFixedRate(
62 new ScheduledTrigger(),
63 durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
64
65 return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
66 }
67 }
68 }
69
70 // check if all tasks that we need to trigger are running.
71 // if not, abort the checkpoint
72 // 检查需要触发checkpoint的所有Tasks是否处于运行状态,如果有一个不满足条件,则不触发检查点
73 Execution[] executions = new Execution[tasksToTrigger.length];
74 for (int i = 0; i < tasksToTrigger.length; i ) {
75 Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
76 if (ee != null && ee.getState() == ExecutionState.RUNNING) {
77 executions[i] = ee;
78 } else {
79 LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
80 tasksToTrigger[i].getTaskNameWithSubtaskIndex());
81 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
82 }
83 }
84
85 // next, check if all tasks that need to acknowledge the checkpoint are running.
86 // if not, abort the checkpoint
87 //检查所有需要ack的tasks是否都处于运行状态,如果有一个不满足条件,则不触发检查点。
88 Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
89
90 for (ExecutionVertex ev : tasksToWaitFor) {
91 Execution ee = ev.getCurrentExecutionAttempt();
92 if (ee != null) {
93 ackTasks.put(ee.getAttemptId(), ev);
94 } else {
95 LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
96 ev.getTaskNameWithSubtaskIndex());
97 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
98 }
99 }
100
101 // we will actually trigger this checkpoint!
102
103 // we lock with a special lock to make sure that trigger requests do not overtake each other.
104 // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
105 // may issue blocking operations. Using a different lock than the coordinator-wide lock,
106 // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
107 // 触发检查点,在triggerLock同步代码块中完成,而不是使用coordinator范围的锁。
108 synchronized (triggerLock) {
109 final long checkpointID;
110 //首先获取checkpointID
111 try {
112 // this must happen outside the coordinator-wide lock, because it communicates
113 // with external services (in HA mode) and may block for a while.
114 checkpointID = checkpointIdCounter.getAndIncrement();
115 }
116 catch (Throwable t) {
117 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
118 LOG.warn("Failed to trigger checkpoint (" numUnsuccessful " consecutive failed attempts so far)", t);
119 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
120 }
121
122 //创建PendingCheckpoint实例,表示待处理检查点
123 final PendingCheckpoint checkpoint = new PendingCheckpoint(
124 job,
125 checkpointID,
126 timestamp,
127 ackTasks,
128 props,
129 targetDirectory,
130 executor);
131
132 if (statsTracker != null) {
133 PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
134 checkpointID,
135 timestamp,
136 props);
137
138 checkpoint.setStatsCallback(callback);
139 }
140
141 // schedule the timer that will clean up the expired checkpoints
142 // 针对当前checkpoints超时进行资源清理的canceller
143 final Runnable canceller = new Runnable() {
144 @Override
145 public void run() {
146 synchronized (lock) {
147 // only do the work if the checkpoint is not discarded anyways
148 // note that checkpoint completion discards the pending checkpoint object
149 if (!checkpoint.isDiscarded()) {
150 LOG.info("Checkpoint " checkpointID " expired before completing.");
151
152 checkpoint.abortExpired();
153 pendingCheckpoints.remove(checkpointID);
154 rememberRecentCheckpointId(checkpointID);
155
156 triggerQueuedRequests();
157 }
158 }
159 }
160 };
161
162 try {
163 //重新请求coordinator-wide lock
164 // re-acquire the coordinator-wide lock
165 synchronized (lock) {
166 // since we released the lock in the meantime, we need to re-check
167 // that the conditions still hold.
168 // 重新检查触发条件,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。
169 if (shutdown) {
170 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
171 }
172 else if (!props.forceCheckpoint()) {
173 if (triggerRequestQueued) {
174 LOG.warn("Trying to trigger another checkpoint while one was queued already");
175 return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
176 }
177
178 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
179 triggerRequestQueued = true;
180 if (currentPeriodicTrigger != null) {
181 currentPeriodicTrigger.cancel(false);
182 currentPeriodicTrigger = null;
183 }
184 return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
185 }
186
187 // make sure the minimum interval between checkpoints has passed
188 final long earliestNext = lastCheckpointCompletionNanos minPauseBetweenCheckpointsNanos;
189 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
190
191 if (durationTillNextMillis > 0) {
192 if (currentPeriodicTrigger != null) {
193 currentPeriodicTrigger.cancel(false);
194 currentPeriodicTrigger = null;
195 }
196
197 // Reassign the new trigger to the currentPeriodicTrigger
198 currentPeriodicTrigger = timer.scheduleAtFixedRate(
199 new ScheduledTrigger(),
200 durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
201
202 return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
203 }
204 }
205
206 LOG.info("Triggering checkpoint " checkpointID " @ " timestamp);
207
208 //将checkpoint加入到pendingCheckpoints中
209 pendingCheckpoints.put(checkpointID, checkpoint);
210
211 //启动超时canceller,延迟checkpointTimeout执行
212 ScheduledFuture<?> cancellerHandle = timer.schedule(
213 canceller,
214 checkpointTimeout, TimeUnit.MILLISECONDS);
215
216 if (!checkpoint.setCancellerHandle(cancellerHandle)) {
217 // checkpoint is already disposed!
218 cancellerHandle.cancel(false);
219 }
220
221 // trigger the master hooks for the checkpoint
222 final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
223 checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
224 for (MasterState s : masterStates) {
225 checkpoint.addMasterState(s);
226 }
227 }
228 // end of lock scope
229
230 CheckpointOptions checkpointOptions;
231 if (!props.isSavepoint()) {
232 checkpointOptions = CheckpointOptions.forFullCheckpoint();
233 } else {
234 checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
235 }
236
237 // send the messages to the tasks that trigger their checkpoint
238 // 向tasks发送消息,触发它们的检查点
239 for (Execution execution: executions) {
240 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
241 }
242
243 numUnsuccessfulCheckpointsTriggers.set(0);
244 return new CheckpointTriggerResult(checkpoint);
245 }
246 catch (Throwable t) {
247 // guard the map against concurrent modifications
248 synchronized (lock) {
249 pendingCheckpoints.remove(checkpointID);
250 }
251
252 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
253 LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
254 checkpointID, numUnsuccessful, t);
255
256 if (!checkpoint.isDiscarded()) {
257 checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
258 }
259 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
260 }
261
262 } // end trigger lock
263 }
启动定时任务方法:startCheckpointScheduler
代码语言:javascript复制 1 public void startCheckpointScheduler() {
2 synchronized (lock) {
3 if (shutdown) {
4 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
5 }
6 //保证所有以前的timer被取消
7 stopCheckpointScheduler();
8
9 periodicScheduling = true;
10 //scheduleAtFixedRate方法是以固定延迟和固定时间间隔周期性的执行任务
11 currentPeriodicTrigger = timer.scheduleAtFixedRate(
12 new ScheduledTrigger(),
13 baseInterval, baseInterval, TimeUnit.MILLISECONDS);
14 }
15 }
停止定时任务方法:stopCheckpointScheduler
代码语言:javascript复制 1 //重置一些标记变量,释放资源
2 public void stopCheckpointScheduler() {
3 synchronized (lock) {
4 triggerRequestQueued = false;
5 periodicScheduling = false;
6
7 if (currentPeriodicTrigger != null) {
8 currentPeriodicTrigger.cancel(false);//取消当前周期的触发任务
9 currentPeriodicTrigger = null;
10 }
11
12 //pendingCheckpoints中存的是待执行的检查点
13 for (PendingCheckpoint p : pendingCheckpoints.values()) {
14 p.abortError(new Exception("Checkpoint Coordinator is suspending."));
15 }
16 pendingCheckpoints.clear();//清空pendingCheckpoints
17 numUnsuccessfulCheckpointsTriggers.set(0);
18 }
19 }
基于Actor的消息驱动的协同机制
启动和停止定时任务的机制是怎样的?Flink使用的是基于AKKA的Actor模型的消息驱动机制。
CheckpointCoordinatorDeActivator类
代码语言:javascript复制org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator
CheckpointCoordinatorDeActivator是actor的实现类,监听JobStatus的变化,启动和停止周期性的checkpoint调度任务。
代码语言:javascript复制 1 //actor的实现类,监听JobStatus的变化,激活和取消周期性的checkpoint调度任务。
2 public class CheckpointCoordinatorDeActivator implements JobStatusListener {
3
4 private final CheckpointCoordinator coordinator;
5
6 public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
7 this.coordinator = checkNotNull(coordinator);
8 }
9
10 @Override
11 public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
12 if (newJobStatus == JobStatus.RUNNING) {
13 // start the checkpoint scheduler
14 // 一旦监听到JobStatus变为RUNNING,就会启动定时任务
15 coordinator.startCheckpointScheduler();
16 } else {
17 // anything else should stop the trigger for now
18 coordinator.stopCheckpointScheduler();
19 }
20 }
21 }
CheckpointCoordinatorDeActivator的实例是在CheckpointCoordinator中被创建的,方法为createActivatorDeactivator。
代码语言:javascript复制 1 public JobStatusListener createActivatorDeactivator() {
2 synchronized (lock) {
3 if (shutdown) {
4 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
5 }
6
7 if (jobStatusListener == null) {
8 jobStatusListener = new CheckpointCoordinatorDeActivator(this);
9 }
10
11 return jobStatusListener;
12 }
13 }
checkpoint相关Akka消息
AbstractCheckpointMessage :所有checkpoint消息的基础抽象类
代码语言:javascript复制org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
AbstractCheckpointMessage主要属性:
代码语言:javascript复制1 /** The job to which this message belongs */
2 private final JobID job;
3 /** The task execution that is source/target of the checkpoint message */
4 private final ExecutionAttemptID taskExecutionId; //检查点的source/target task
5 /** The ID of the checkpoint that this message coordinates */
6 private final long checkpointId;
它有以下实现类:
TriggerCheckpoint :JobManager向TaskManager发送的检查点触发消息;
AcknowledgeCheckpoint :TaskManager向JobManager发送的某个独立task的检查点完成确认的消息;
DeclineCheckpoint :TaskManager向JobManager发送的检查点还没有被处理的消息;
NotifyCheckpointComplete :JobManager向TaskManager发送的检查点完成的消息。
TriggerCheckpoint消息
从JobManager发送到TaskManager,通知指定的task触发checkpoint。
发送消息
发送消息的逻辑是在CheckpointCoordinator中,上文提到过:
代码语言:javascript复制1 // send the messages to the tasks that trigger their checkpoint
2 for (Execution execution: executions) {
3 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
4 }
其中executions是Execution[]数组,其中存储的元素是在检查点触发时需要被发送消息的Tasks的集合(即CheckpointCoordinator成员变量tasksToTrigger中的数据)。对每一个要发送的Task执行triggerCheckpoint()方法。
接下来,看一下Execution的triggerCheckpoint方法。
代码语言:javascript复制 1 //在该execution的task上触发一个新的checkpoint
2 public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
3 //获取Resource
4 final SimpleSlot slot = assignedResource;//获取Slot
5
6 if (slot != null) {
7 //TaskManagerGateway是用于和TaskManager通信的抽象基础类
8 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
9 //通过taskManagerGateway向TaskManager发送消息
10 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
11 } else {
12 LOG.debug("The execution has no slot assigned. This indicates that the execution is "
13 "no longer running.");
14 }
15 }
继续进入ActorTaskManagerGateway(TaskManagerGateway抽象类的Actor实现)类的triggerCheckpoint()方法:
代码语言:javascript复制 1 public void triggerCheckpoint(
2 ExecutionAttemptID executionAttemptID,
3 JobID jobId,
4 long checkpointId,
5 long timestamp,
6 CheckpointOptions checkpointOptions) {
7
8 Preconditions.checkNotNull(executionAttemptID);
9 Preconditions.checkNotNull(jobId);
10 //新建了一个TriggerCheckpoint消息,通过actorGateway的tell方法(异步发送,没有返回结果)发送这个消息
11 //ActorGateway是基于actor通信的接口
12 actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
13 }
AkkaActorGateway类是ActorGateway接口一种实现,它使用 Akka 与远程的actors进行通信。看一下AkkaActorGateway的tell方法:
代码语言:javascript复制1 @Override
2 public void tell(Object message) {
3 Object newMessage = decorator.decorate(message);
4 //通过ActorRef实例actor发送消息,ActorRef是akka中的类。以后需要研究Akka的实现机制。
5 actor.tell(newMessage, ActorRef.noSender());
6 }
至此,发送TriggerCheckpoint消息的过程结束。下面将看一下TaskManager接收消息的过程。
接收消息
TaskManager接收消息的部分是用scala实现的。
代码语言:javascript复制org.apache.flink.runtime.taskmanager.TaskManager
TaskManager类的handleMessage方法是消息处理中心。
代码语言:javascript复制 1 //该方法为TaskManager的消息处理中心。接收消息,按消息的种类调用不同的方法处理。
2 override def handleMessage: Receive = {
3 case message: TaskMessage => handleTaskMessage(message)
4
5 //这个就是处理checkpoints相关的消息
6 case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
7
8 case JobManagerLeaderAddress(address, newLeaderSessionID) =>
9 handleJobManagerLeaderAddress(address, newLeaderSessionID)
10
11 case message: RegistrationMessage => handleRegistrationMessage(message)
12
13 ...
14 }
接下来,看方法handleCheckpointingMessage(),主要是触发Checkpoint Barrier。
代码语言:javascript复制 1 //处理Checkpoint相关的消息
2 private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
3
4 actorMessage match {
5 //触发Checkpoint消息
6 case message: TriggerCheckpoint =>
7 val taskExecutionId = message.getTaskExecutionId
8 val checkpointId = message.getCheckpointId
9 val timestamp = message.getTimestamp
10 val checkpointOptions = message.getCheckpointOptions
11
12 log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
13
14 val task = runningTasks.get(taskExecutionId)
15 if (task != null) {
16 //调用Task的triggerCheckpointBarrier方法,触发Checkpoint Barrier,Barrier实现机制的细节以后讨论。
17 task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
18 } else {
19 log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
20 }
21 //Checkpoint完成通知消息
22 case message: NotifyCheckpointComplete =>
23 val taskExecutionId = message.getTaskExecutionId
24 val checkpointId = message.getCheckpointId
25 val timestamp = message.getTimestamp
26
27 log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
28
29 val task = runningTasks.get(taskExecutionId)
30 if (task != null) {
31 //调用Task的notifyCheckpointComplete方法,进行相关处理
32 task.notifyCheckpointComplete(checkpointId)
33 } else {
34 log.debug(
35 s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
36 }
37
38 // unknown checkpoint message
39 case _ => unhandled(actorMessage)
40 }
41 }
NotifyCheckpointComplete消息
JobManager发送到TaskManager,通知task它的检查点已经得到完成确认,task可以向第三方提交checkpoint。
发送消息
发送NotifyCheckpointComplete消息的部分在CheckpointCoordinator类的receiveAcknowledgeMessage方法中。
代码语言:javascript复制 1 //该方法接收一个AcknowledgeCheckpoint消息,返回该Message是否与一个pending checkpoint相关联
2 public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
3 if (shutdown || message == null) {
4 return false;
5 }
6 if (!job.equals(message.getJob())) {
7 LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
8 return false;
9 }
10
11 final long checkpointId = message.getCheckpointId();
12
13 synchronized (lock) {
14 // we need to check inside the lock for being shutdown as well, otherwise we
15 // get races and invalid error log messages
16 if (shutdown) {
17 return false;
18 }
19
20 final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
21
22 //如果是待处理的检查点并且没有被Discarded
23 if (checkpoint != null && !checkpoint.isDiscarded()) {
24
25 //根据TaskExecutionId和SubtaskState,Acknowledges the task。确认该任务
26 switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
27 //确认成功
28 case SUCCESS:
29 LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
30 checkpointId, message.getTaskExecutionId(), message.getJob());
31 //如果收到了全部task的确认消息(即notYetAcknowledgedTasks为空)
32 if (checkpoint.isFullyAcknowledged()) {
33 //尝试完成PendingCheckpoint(Try to complete the given pending checkpoint)
34 //将完成的checkpointId从checkpoint中删除和一下标志修改,最后,发送notify complete消息
35 completePendingCheckpoint(checkpoint);
36 }
37 break;
38 //重复消息
39 case DUPLICATE:
40 LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
41 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
42 break;
43 //未知消息
44 case UNKNOWN:
45 LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, "
46 "because the task's execution attempt id was unknown. Discarding "
47 "the state handle to avoid lingering state.", message.getCheckpointId(),
48 message.getTaskExecutionId(), message.getJob());
49
50 discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
51 break;
52 //废弃消息
53 case DISCARDED:
54 LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, "
55 "because the pending checkpoint had been discarded. Discarding the "
56 "state handle tp avoid lingering state.",
57 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
58 discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
59 }
60
61 return true;
62 }
63 else if (checkpoint != null) {
64 // this should not happen
65 throw new IllegalStateException(
66 "Received message for discarded but non-removed checkpoint " checkpointId);
67 }
68 else {
69 boolean wasPendingCheckpoint;
70
71 // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
72 if (recentPendingCheckpoints.contains(checkpointId)) {
73 wasPendingCheckpoint = true;
74 LOG.warn("Received late message for now expired checkpoint attempt {} from "
75 "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
76 }
77 else {
78 LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
79 checkpointId, message.getTaskExecutionId(), message.getJob());
80 wasPendingCheckpoint = false;
81 }
82
83 // try to discard the state so that we don't have lingering state lying around
84 discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
85
86 return wasPendingCheckpoint;
87 }
88 }
89 }
completePendingCheckpoint方法中发送NotifyCheckpointComplete消息的代码如下:
代码语言:javascript复制1 for (ExecutionVertex ev : tasksToCommitTo) {
2 Execution ee = ev.getCurrentExecutionAttempt();
3 if (ee != null) {
4 ee.notifyCheckpointComplete(checkpointId, timestamp);
5 }
6 }
接收消息
在TriggerCheckpoint消息接收中的有这部分代码,主要是调用notifyCheckpointComplete方法: task.notifyCheckpointComplete(checkpointId)。
AcknowledgeCheckpoint消息
由TaskManager发向JobManager,告知JobManager指定task的checkpoint已完成。该消息可能携带task的状态和checkpointMetrics。
AcknowledgeCheckpoint消息类的两个属性:
代码语言:javascript复制 private final SubtaskState subtaskState;//任务状态
private final CheckpointMetrics checkpointMetrics;
发送消息
发送消息的过程在RuntimeEnvironment类中的acknowledgeCheckpoint方法
代码语言:javascript复制1 public void acknowledgeCheckpoint(
2 long checkpointId,
3 CheckpointMetrics checkpointMetrics,
4 SubtaskState checkpointStateHandles) {
5 //通过CheckpointResponder接口的实例checkpointResponder发送ack消息
6 checkpointResponder.acknowledgeCheckpoint(
7 jobId, executionId, checkpointId, checkpointMetrics,
8 checkpointStateHandles);
9 }
CheckpointResponder接口是checkpoint acknowledge and decline messages 的应答类。ActorGatewayCheckpointResponder是使用了ActorGateway的CheckpointResponder接口的实现类,包含acknowledgeCheckpoint和declineCheckpoint两个方法。
代码语言:javascript复制 1 @Override
2 public void acknowledgeCheckpoint(
3 JobID jobID,
4 ExecutionAttemptID executionAttemptID,
5 long checkpointId,
6 CheckpointMetrics checkpointMetrics,
7 SubtaskState checkpointStateHandles) {
8 //新建一个AcknowledgeCheckpoint消息
9 AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
10 jobID, executionAttemptID, checkpointId, checkpointMetrics,
11 checkpointStateHandles);
12 //通过actorGateway发送出去
13 actorGateway.tell(message);
14 }
接收消息
通过receiveAcknowledgeMessage方法接收(和NotifyCheckpointComplete消息的发送过程在同一个方法)。
DeclineCheckpoint消息
该消息由TaskManager发送给JobManager,用于告知CheckpointCoordinator:检查点的请求还没有能够被处理。这种情况通常发生于:某task已处于RUNNING状态,但在内部可能还没有准备好执行检查点。
发送消息
位于task类的triggerCheckpointBarrier方法中。
代码语言:javascript复制org.apache.flink.runtime.taskmanager.Task
代码语言:javascript复制1 try {
2 boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
3 if (!success) {
4 //通过CheckpointResponder发送消息,类似发送AcknowledgeCheckpoint消息
5 checkpointResponder.declineCheckpoint(
6 getJobID(), getExecutionId(), checkpointID,
7 new CheckpointDeclineTaskNotReadyException(taskName));
8 }
9 }
接收消息
CheckpointCoordinator中的receiveDeclineMessage方法。
代码语言:javascript复制 1 public void receiveDeclineMessage(DeclineCheckpoint message) {
2 if (shutdown || message == null) {
3 return;
4 }
5 if (!job.equals(message.getJob())) {
6 throw new IllegalArgumentException("Received DeclineCheckpoint message for job "
7 message.getJob() " while this coordinator handles job " job);
8 }
9
10 final long checkpointId = message.getCheckpointId();
11 final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
12
13 PendingCheckpoint checkpoint;
14
15 synchronized (lock) {
16 // we need to check inside the lock for being shutdown as well, otherwise we
17 // get races and invalid error log messages
18 if (shutdown) {
19 return;
20 }
21
22 checkpoint = pendingCheckpoints.get(checkpointId);
23
24 if (checkpoint != null && !checkpoint.isDiscarded()) {
25 //如果是待处理的Checkpoint且没有被遗弃
26 LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
27 checkpointId, message.getTaskExecutionId(), reason);
28
29 pendingCheckpoints.remove(checkpointId);//将checkpointId从pendingCheckpoints中删除
30 checkpoint.abortDeclined();
31 rememberRecentCheckpointId(checkpointId);
32
33 // we don't have to schedule another "dissolving" checkpoint any more because the
34 // cancellation barriers take care of breaking downstream alignments
35 // we only need to make sure that suspended queued requests are resumed
36
37 //是否还有更多pending 的checkpoint
38 boolean haveMoreRecentPending = false;
39 for (PendingCheckpoint p : pendingCheckpoints.values()) {
40 if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
41 haveMoreRecentPending = true;
42 break;
43 }
44 }
45 //
46 if (!haveMoreRecentPending) {
47 triggerQueuedRequests();
48 }
49 }
50 else if (checkpoint != null) {
51 // this should not happen
52 throw new IllegalStateException(
53 "Received message for discarded but non-removed checkpoint " checkpointId);
54 }
55 else if (LOG.isDebugEnabled()) {
56 if (recentPendingCheckpoints.contains(checkpointId)) {
57 // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
58 LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
59 checkpointId, reason);
60 } else {
61 // message is for an unknown checkpoint. might be so old that we don't even remember it any more
62 LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
63 checkpointId, reason);
64 }
65 }
66 }
67 }