工作中遇到了与 snapshot 异常相关的问题,特此总结一下,与 snapshot 相关的流程图如下:
当调用 AbstractUdfStreamOperator.snapshotState 方法时,实际上调用了
代码语言:javascript复制public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
// 调用 checkpoint function 的 snapshotState 方法
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
......
当用户定义的 snapshotState 方法向外抛异常时,异常会一直上抛至 Task.triggerCheckpointBarrier 方法
代码语言:javascript复制public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
//实际上就是 StreamTask Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
// source ->flatMap
// invokable 实际上是 operator chain
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
// build a local closure
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
// only 做 checkpoint 的异常
// 当 checkpoint 发生异常时,ExecutionState 会转化为 FAILED 会导致重启
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " checkpointID " for "
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for "
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
} else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
}
}
其中关键性的方法实际上是
代码语言:javascript复制if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " checkpointID " for "
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for "
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
而此方法调用了
代码语言:javascript复制cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
查看细节
代码语言:javascript复制private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
while (true) {
ExecutionState current = executionState;
// if the task is already canceled (or canceling) or finished or failed,
// then we need not do anything
if (current.isTerminal() || current == ExecutionState.CANCELING) {
LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
return;
}
if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
if (transitionState(current, targetState, cause)) {
// if we manage this state transition, then the invokable gets never called
// we need not call cancel on it
this.failureCause = cause;
return;
}
} else if (current == ExecutionState.RUNNING) {
if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
// we are canceling / failing out of the running state
// we need to cancel the invokable
// copy reference to guard against concurrent null-ing out the reference
final AbstractInvokable invokable = this.invokable;
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
// because the canceling may block on user code, we cancel from a separate thread
// we do not reuse the async call handler, because that one may be blocked, in which
// case the canceling could not continue
// The canceller calls cancel and interrupts the executing thread once
Runnable canceler = new TaskCanceler(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
producedPartitions,
inputGates);
Thread cancelThread = new Thread(
executingThread.getThreadGroup(),
canceler,
String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
cancelThread.setDaemon(true);
cancelThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
cancelThread.start();
// the periodic interrupting thread - a different thread than the canceller, in case
// the application code does blocking stuff in its cancellation paths.
if (invokable.shouldInterruptOnCancel()) {
Runnable interrupter = new TaskInterrupter(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
taskCancellationInterval);
Thread interruptingThread = new Thread(
executingThread.getThreadGroup(),
interrupter,
String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
interruptingThread.setDaemon(true);
interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
interruptingThread.start();
}
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog = new TaskCancelerWatchDog(
executingThread,
taskManagerActions,
taskCancellationTimeout,
LOG);
Thread watchDogThread = new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format("Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
}
return;
}
} else {
throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
current, taskNameWithSubtask, executionId));
}
}
}
主要就是将 ExecutionState 转化为 FAILED,然后进行一系列的取消操作。由于 ExecutionState 转为 FAILED,会触发 flink 的重启机制,若无重启机制,则直接失败。