当 snapshot 失败时发生了什么

2020-10-16 09:37:58 浏览数 (3)

工作中遇到了与 snapshot 异常相关的问题,特此总结一下,与 snapshot 相关的流程图如下:

当调用 AbstractUdfStreamOperator.snapshotState 方法时,实际上调用了

代码语言:javascript复制
public static void snapshotFunctionState(
			StateSnapshotContext context,
			OperatorStateBackend backend,
			Function userFunction) throws Exception {

		Preconditions.checkNotNull(context);
		Preconditions.checkNotNull(backend);

		while (true) {

			if (trySnapshotFunctionState(context, backend, userFunction)) {
				break;
			}

			// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
			if (userFunction instanceof WrappingFunction) {
				userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
			} else {
				break;
			}
		}
	}

	private static boolean trySnapshotFunctionState(
			StateSnapshotContext context,
			OperatorStateBackend backend,
			Function userFunction) throws Exception {

		// 调用 checkpoint function 的 snapshotState 方法
		if (userFunction instanceof CheckpointedFunction) {
			((CheckpointedFunction) userFunction).snapshotState(context);

			return true;
		}
......

当用户定义的 snapshotState 方法向外抛异常时,异常会一直上抛至 Task.triggerCheckpointBarrier 方法

代码语言:javascript复制
public void triggerCheckpointBarrier(
		final long checkpointID,
		long checkpointTimestamp,
		final CheckpointOptions checkpointOptions) {
		
		//实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
		// source ->flatMap
		// invokable 实际上是 operator chain
		final AbstractInvokable invokable = this.invokable;
		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
		
		if (executionState == ExecutionState.RUNNING && invokable != null) {
			
			// build a local closure
			final String taskName = taskNameWithSubtask;
			final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
				FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
			
			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					// set safety net from the task's context for checkpointing thread
					LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
					FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
					
					try {
						// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
						// only 做 checkpoint 的异常
						// 当 checkpoint 发生异常时,ExecutionState 会转化为 FAILED 会导致重启
						boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
						if (!success) {
							checkpointResponder.declineCheckpoint(
								getJobID(), getExecutionId(), checkpointID,
								new CheckpointDeclineTaskNotReadyException(taskName));
						}
					} catch (Throwable t) {
						if (getExecutionState() == ExecutionState.RUNNING) {
							failExternally(new Exception(
								"Error while triggering checkpoint "   checkpointID   " for "  
									taskNameWithSubtask, t));
						} else {
							LOG.debug("Encountered error while triggering checkpoint {} for "  
									"{} ({}) while being not in state running.", checkpointID,
								taskNameWithSubtask, executionId, t);
						}
					} finally {
						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
					}
				}
			};
			executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
		} else {
			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
			
			// send back a message that we did not do the checkpoint
			checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
				new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
		}
	}

其中关键性的方法实际上是

代码语言:javascript复制
if (getExecutionState() == ExecutionState.RUNNING) {
							failExternally(new Exception(
								"Error while triggering checkpoint "   checkpointID   " for "  
									taskNameWithSubtask, t));
						} else {
							LOG.debug("Encountered error while triggering checkpoint {} for "  
									"{} ({}) while being not in state running.", checkpointID,
								taskNameWithSubtask, executionId, t);
						}

而此方法调用了

代码语言:javascript复制
cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);

查看细节

代码语言:javascript复制
private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
		while (true) {
			ExecutionState current = executionState;
			
			// if the task is already canceled (or canceling) or finished or failed,
			// then we need not do anything
			if (current.isTerminal() || current == ExecutionState.CANCELING) {
				LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
				return;
			}
			
			if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
				if (transitionState(current, targetState, cause)) {
					// if we manage this state transition, then the invokable gets never called
					// we need not call cancel on it
					this.failureCause = cause;
					return;
				}
			} else if (current == ExecutionState.RUNNING) {
				if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
					// we are canceling / failing out of the running state
					// we need to cancel the invokable
					
					// copy reference to guard against concurrent null-ing out the reference
					final AbstractInvokable invokable = this.invokable;
					
					if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
						this.failureCause = cause;
						
						LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
						
						// because the canceling may block on user code, we cancel from a separate thread
						// we do not reuse the async call handler, because that one may be blocked, in which
						// case the canceling could not continue
						
						// The canceller calls cancel and interrupts the executing thread once
						Runnable canceler = new TaskCanceler(
							LOG,
							invokable,
							executingThread,
							taskNameWithSubtask,
							producedPartitions,
							inputGates);
						
						Thread cancelThread = new Thread(
							executingThread.getThreadGroup(),
							canceler,
							String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
						cancelThread.setDaemon(true);
						cancelThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
						cancelThread.start();
						
						// the periodic interrupting thread - a different thread than the canceller, in case
						// the application code does blocking stuff in its cancellation paths.
						if (invokable.shouldInterruptOnCancel()) {
							Runnable interrupter = new TaskInterrupter(
								LOG,
								invokable,
								executingThread,
								taskNameWithSubtask,
								taskCancellationInterval);
							
							Thread interruptingThread = new Thread(
								executingThread.getThreadGroup(),
								interrupter,
								String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
							interruptingThread.setDaemon(true);
							interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
							interruptingThread.start();
						}
						
						// if a cancellation timeout is set, the watchdog thread kills the process
						// if graceful cancellation does not succeed
						if (taskCancellationTimeout > 0) {
							Runnable cancelWatchdog = new TaskCancelerWatchDog(
								executingThread,
								taskManagerActions,
								taskCancellationTimeout,
								LOG);
							
							Thread watchDogThread = new Thread(
								executingThread.getThreadGroup(),
								cancelWatchdog,
								String.format("Cancellation Watchdog for %s (%s).",
									taskNameWithSubtask, executionId));
							watchDogThread.setDaemon(true);
							watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
							watchDogThread.start();
						}
					}
					return;
				}
			} else {
				throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
					current, taskNameWithSubtask, executionId));
			}
		}
	}

主要就是将 ExecutionState 转化为 FAILED,然后进行一系列的取消操作。由于 ExecutionState 转为 FAILED,会触发 flink 的重启机制,若无重启机制,则直接失败。

1 人点赞