前言
对应Flink来说checkpoint的作用及重要性就不细说了,前面文章写过checkpoint的详细过程和checkpoint周期性触发过程。本篇我们在一起根据源码看下checkpoint的详细执行过程。
checkpoint过程
源头
我们都知道checkpoint的周期性触发是由jobmanager中的一个叫做CheckpointCoordinator角色发起的,具体执行在CheckpointCoordinator.triggerCheckpoint中,这个方法代码逻辑很长,概括一下主要包括:
预检查。包括:
- 是否需要强制进行 checkpoint
- 当前正在排队的并发 checkpoint 的数目是否超过阈值
- 距离上一次成功 checkpoint 的间隔时间是否过小
- 如果上述条件不满足则不会进行这次checkpoint。
- 检查需要触发的task是否都是running状态,否则放弃。之前踩过坑,请见记一次flink不做checkpoint的问题。
- 检查所有需要ack checkpoint完成的task是否都是running状态。否则放弃。上面的检查都通过之后就可以做checkpoint啦。
- 生成唯一自增的checkpointID。
- 初始化CheckpointStorageLocation,用于存储这次checkpoint快照的路径,不同的backend有区别。
- 生成 PendingCheckpoint,这表示一个处于中间状态的 checkpoint,并保存在 checkpointId -> PendingCheckpoint 这样的映射关系中。
- 注册一个调度任务,在 checkpoint 超时后取消此次 checkpoint,并重新触发一次新的 checkpoint
- 调用 Execution.triggerCheckpoint() 方法向所有需要 trigger 的 task 发起 checkpoint 请求
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
最终通过 RPC 调用 TaskExecutorGateway.triggerCheckpoint,即请求执行 TaskExecutor.triggerCheckpoin()。因为一个 TaskExecutor 中可能有多个 Task 正在运行,因而要根据触发 checkpoint 的 ExecutionAttemptID 找到对应的 Task,然后调用 Task.triggerCheckpointBarrier() 方法
代码语言:javascript复制
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}
代码语言:javascript复制
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " executionAttemptID '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
Task 执行 checkpoint 的真正逻辑被封装在 AbstractInvokable.triggerCheckpointAsync(...) 中,
代码语言:javascript复制
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions,
final boolean advanceToEndOfEventTime) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
}
catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
LOG.debug(
"Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
checkpointID, taskNameWithSubtask, executionId);
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " checkpointID " for "
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for "
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
}
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointException("Task name with subtask : " taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
}
}
triggerCheckpointAsync方法分别被SourceStreamTask和普通StreamTask覆盖,主要逻辑还是在StreamTask中
代码语言:javascript复制
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
final long checkpointId = checkpointMetaData.getCheckpointId();
if (isRunning) {
actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
});
return true;
} else {
actionExecutor.runThrowing(() -> {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
主要做三件事:1)checkpoint的准备操作,这里通常不进行太多操作;2)发送 CheckpointBarrier;3)存储检查点快照。
广播Barrier
代码语言:javascript复制
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
进行快照
代码语言:javascript复制
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
//checkpoint的存储地址及元数据信息
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());
//将checkpoint的过程封装为CheckpointingOperation对象
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
checkpointingOperation.executeCheckpointing();
}
每一个算子的快照被抽象为 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照结果:
代码语言:javascript复制
public class OperatorSnapshotFutures {
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
}
由于每一个 StreamTask 可能包含多个算子,因而内部使用一个 Map 维护 OperatorID -> OperatorSnapshotFutures 的关系。
代码语言:javascript复制
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
快照的过程分同步和异步两个部分
代码语言:javascript复制
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
try {
//同步
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
//异步
// checkpoint 可以配置成同步执行,也可以配置成异步执行的
// 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. "
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
} catch (Exception ex) {
// Cleanup to release resources
for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
if (null != operatorSnapshotResult) {
try {
operatorSnapshotResult.cancel();
} catch (Exception e) {
LOG.warn("Could not properly cancel an operator snapshot result.", e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. "
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
if (checkpointOptions.getCheckpointType().isSynchronous()) {
// in the case of a synchronous checkpoint, we always rethrow the exception,
// so that the task fails.
// this is because the intention is always to stop the job after this checkpointing
// operation, and without the failure, the task would go back to normal execution.
throw ex;
} else {
owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
}
}
}
在同步执行阶段,会依次调用每一个算子的 StreamOperator.snapshotState,返回结果是一个 runnable future。根据 checkpoint 配置成同步模式和异步模式的区别,这个 future 可能处于完成状态,也可能处于未完成状态:
代码语言:javascript复制private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
//同步过程调用算子的snapshotState方法,返回OperatorSnapshotFutures可能已完成或未完成
OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
详细过程在AbstractStreamOperator#snapshotState
代码语言:javascript复制public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables());
try {
//对状态进行快照,包括KeyedState和OperatorState
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
//写入operatorState快照
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
//写入keyedState快照
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
String snapshotFailMessage = "Could not complete snapshot " checkpointId " for operator "
getOperatorName() ".";
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
try {
snapshotContext.closeExceptionally();
} catch (IOException e) {
snapshotException.addSuppressed(e);
}
throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
}
return snapshotInProgress;
}
我们知道state还分为raw state(原生state)和managed state(flink管理的state),timer定时器属于raw state,也需要写到snapshot中。
代码语言:javascript复制/**
* Stream operators with state, which want to participate in a snapshot need to override this hook method.
*
* @param context context that provides information and means required for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
// 所有的 timer 都作为 raw keyed state 写入
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
KeyedStateCheckpointOutputStream out;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for "
getOperatorName() '.', exception);
}
try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);
timeServiceManager.snapshotStateForKeyGroup(
new DataOutputViewStreamWrapper(out), keyGroupIdx);
}
} catch (Exception exception) {
throw new Exception("Could not write timer service of " getOperatorName()
" to checkpoint state stream.", exception);
} finally {
try {
out.close();
} catch (Exception closeException) {
LOG.warn("Could not close raw keyed operator state stream for {}. This "
"might have prevented deleting some state data.", getOperatorName(), closeException);
}
}
}
}
上面是AbstractStreamOperator中的snapshotState做的操作,还有个子类AbstractUdfStreamOperator
代码语言:javascript复制public void snapshotState(StateSnapshotContext context) throws Exception {
//先调用父类方法,写入timer
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
代码语言:javascript复制public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
//如果用户函数实现了CheckpointedFunction接口,则调用udf中的snapshotState方法进行快照
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
// 如果用户函数实现了 ListCheckpointed
if (userFunction instanceof ListCheckpointed) {
//先调用 snapshotState 方法获取当前状态
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
//获取状态后端存储引用
ListState<Serializable> listState = backend.
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
//清空
listState.clear();
//当前状态写入状态后端存储
if (null != partitionableState) {
try {
for (Serializable statePartition : partitionableState) {
listState.add(statePartition);
}
} catch (Exception e) {
listState.clear();
throw new Exception("Could not write partitionable state to operator "
"state backend.", e);
}
}
return true;
}
return false;
}
到这里我们知道了checkpoint过程中如何调用到我们自己实现的快照方法。再看下flink管理的状态是如何写入快照的。
代码语言:javascript复制 if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
首先来看看 operator state。DefaultOperatorStateBackend 将实际的工作交给 DefaultOperatorStateBackendSnapshotStrategy 完成。首先,会为对当前注册的所有 operator state(包含 list state 和 broadcast state)做深度拷贝,然后将实际的写入操作封装在一个异步的 FutureTask 中,这个 FutureTask 的主要任务包括:1)打开输出流 2)写入状态元数据信息 3)写入状态 4)关闭输出流,获得状态句柄。如果不启用异步checkpoint模式,那么这个 FutureTask 在同步阶段就会立刻执行。
代码语言:javascript复制public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull final CheckpointOptions checkpointOptions) throws IOException {
if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
return DoneFuture.of(SnapshotResult.empty());
}
final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
new HashMap<>(registeredBroadcastStates.size());
ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassLoader);
try {
// eagerly create deep copies of the list and the broadcast states (if any)
// in the synchronous phase, so that we can use them in the async writing.
//获得已注册的所有 list state 和 broadcast state 的深拷贝
if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}
if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
//将主要写入操作封装为一个异步的FutureTask
AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {
@Override
protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
// 创建状态输出流
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
// 收集元数据
// get the registered operator state infos ...
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
new ArrayList<>(registeredOperatorStatesDeepCopies.size());
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// 写入元数据
// ... get the registered broadcast operator state infos ...
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
new ArrayList<>(registeredBroadcastStatesDeepCopies.size());
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... write them all in the checkpoint stream ...
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
// ... and then go for the states ...
// 写入状态
// we put BOTH normal and broadcast state metadata here
int initialMapCapacity =
registeredOperatorStatesDeepCopies.size() registeredBroadcastStatesDeepCopies.size();
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(initialMapCapacity);
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(localOut);
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and the broadcast states themselves ...
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
BackendWritableBroadcastState<?, ?> value = entry.getValue();
long[] partitionOffsets = {value.write(localOut)};
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and, finally, create the state handle.
OperatorStateHandle retValue = null;
if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
//关闭输出流,获得状态句柄,后面可以用这个句柄读取状态
StreamStateHandle stateHandle = localOut.closeAndGetHandle();
if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
}
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}
}
@Override
protected void cleanupProvidedResources() {
// nothing to do
}
@Override
protected void logAsyncSnapshotComplete(long startTime) {
if (asynchronousSnapshots) {
logAsyncCompleted(streamFactory, startTime);
}
}
};
final FutureTask<SnapshotResult<OperatorStateHandle>> task =
snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
//如果不是异步 checkpoint 那么在这里直接运行 FutureTask,即在同步阶段就完成了状态的写入
if (!asynchronousSnapshots) {
task.run();
}
return task;
}
keyed state 写入的基本流程与此相似,但由于 keyed state 在存储时有多种实现,包括基于堆内存和 RocksDB 的不同实现,此外基于 RocksDB 的实现还包括支持增量 checkpoint,因而相比于 operator state 要更复杂一些。
至此,我们介绍了快照操作的第一个阶段,即同步执行的阶段。异步执行阶段被封装为 AsyncCheckpointRunnable,主要的操作包括 1)执行同步阶段创建的 FutureTask 2)完成后向 CheckpointCoordinator 发送 Ack 响应。
代码语言:javascript复制protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
// 完成每一个 operator 的状态写入
// 如果是同步 checkpoint,那么在此之前状态已经写入完成
// 如果是异步 checkpoint,那么在这里才会写入状态
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// finalize the async part of all by executing all snapshot runnables
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
//报告 snapshot 完成
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
}
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
}
}
public class TaskStateManagerImpl implements TaskStateManager {
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
//发送 ACK 响应给 CheckpointCoordinator
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
}
Checkpoint 的确认
Task 对 checkpoint 的响应是通过 CheckpointResponder 接口完成的:
代码语言:javascript复制public interface CheckpointResponder {
/**
* Acknowledges the given checkpoint.
*/
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState);
/**
* Declines the given checkpoint.
*/
void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause);
}
RpcCheckpointResponder 作为 CheckpointResponder 的具体实现,主要是通过 RPC 调用通知 CheckpointCoordinatorGateway,即通知给 JobMaster, JobMaster 调用 CheckpointCoordinator.receiveAcknowledgeMessage() 和 CheckpointCoordinator.receiveDeclineMessage() 进行处理。
确认完成
在一个 Task 完成 checkpoint 操作后,CheckpointCoordinator 接收到 Ack 响应,对 Ack 响应的处理流程主要如下:
- 根据 Ack 的 checkpointID 从 Map pendingCheckpoints 中查找对应的 PendingCheckpoint
- 若存在对应的 PendingCheckpoint
- 这个 PendingCheckpoint 没有被丢弃,调用 PendingCheckpoint.acknowledgeTask 方法处理 Ack,根据处理结果的不同:
- SUCCESS:判断是否已经接受了所有需要响应的 Ack,如果是,则调用 completePendingCheckpoint 完成此次 checkpoint
- DUPLICATE:Ack 消息重复接收,直接忽略
- UNKNOWN:未知的 Ack 消息,清理上报的 Ack 中携带的状态句柄
- DISCARD:Checkpoint 已经被 discard,清理上报的 Ack 中携带的状态句柄
- 这个 PendingCheckpoint 已经被丢弃,抛出异常
- 若不存在对应的 PendingCheckpoint,则清理上报的 Ack 中携带的状态句柄相应代码:
class CheckpointCoordinator {
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
return false;
}
final long checkpointId = message.getCheckpointId();
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
break;
case UNKNOWN:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, "
"because the task's execution attempt id was unknown. Discarding "
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, "
"because the pending checkpoint had been discarded. Discarding the "
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " checkpointId);
}
else {
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from "
"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
}
else {
LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
}
对于一个已经触发但还没有完成的 checkpoint,即 PendingCheckpoint,它是如何处理 Ack 消息的呢?在 PendingCheckpoint 内部维护了两个 Map,分别是:
- Map operatorStates; : 已经接收到 Ack 的算子的状态句柄
- Map notYetAcknowledgedTasks;: 需要 Ack 但还没有接收到的 Task
每当接收到一个 Ack 消息时,PendingCheckpoint 就从 notYetAcknowledgedTasks 中移除对应的 Task,并保存 Ack 携带的状态句柄保存。当 notYetAcknowledgedTasks 为空时,表明所有的 Ack 消息都接收到了。
一旦 PendingCheckpoint 确认所有 Ack 消息都已经接收,那么就可以完成此次 checkpoint 了,具体包括:
- 调用 PendingCheckpoint.finalizeCheckpoint() 将 PendingCheckpoint 转化为 CompletedCheckpoint
- 获取 CheckpointMetadataOutputStream,将所有的状态句柄信息通过 CheckpointMetadataOutputStream 写入到存储系统中
- 创建一个 CompletedCheckpoint 对象
- 将 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
CompletedCheckpointStore 有两种实现,分别为 StandaloneCompletedCheckpointStore 和 ZooKeeperCompletedCheckpointStore StandaloneCompletedCheckpointStore 简单地将 CompletedCheckpointStore 存放在一个数组中 ZooKeeperCompletedCheckpointStore 提供高可用实现:先将 CompletedCheckpointStore 写入到 RetrievableStateStorageHelper 中(通常是文件系统),然后将文件句柄存在 ZK 中 保存的 CompletedCheckpointStore 数量是有限的,会删除旧的快照
- 移除被越过的 PendingCheckpoint,因为 CheckpointID 是递增的,那么所有比当前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丢弃了
- 依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
通过 RPC 调用 TaskExecutor.confirmCheckpoint() 告知对应的 Task
Task收到notifyCheckpointComplete确认后进行后续处理,比如kafkaproduce的两段式提交过程。
总结
本文分析了checkpoint进行snapshot的过程,包括广播barrier、进行snapshot以及checkpoint完成后的ACK过程。
版权声明:
本文为《大数据真好玩》整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|冷眼丶
微信公众号|大数据真好玩
文章不错?点个【在看】吧! ?