一文搞定 Flink Checkpoint Barrier 全流程

2020-06-21 16:40:35 浏览数 (1)

上文中,我们一起了解了 一文搞定 Flink 消费消息的全流程,接下来呢,我们一起来看一下 checkpoint barrier 的全流程。

首先呢,Job 启动的时候,Flink 会 startCheckpointScheduler

代码语言:javascript复制
public void startCheckpointScheduler() {
		synchronized (lock) {
			if (shutdown) {
				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
			}

			// make sure all prior timers are cancelled
			stopCheckpointScheduler();

			periodicScheduling = true;
			long initialDelay = ThreadLocalRandom.current().nextLong(
				minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval   1L);
			// 定时任务
			currentPeriodicTrigger = timer.scheduleAtFixedRate(
					new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
		}
	}

通过定时任务来触发 checkpoint。 到 Task.triggerCheckpoint

代码语言:javascript复制
@Override
	// trigger operator chain task trigger checkpoint
	public CompletableFuture<Acknowledge> triggerCheckpoint(
			ExecutionAttemptID executionAttemptID,
			long checkpointId,
			long checkpointTimestamp,
			CheckpointOptions checkpointOptions) {
		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

		final Task task = taskSlotTable.getTask(executionAttemptID);

		if (task != null) {
			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

			return CompletableFuture.completedFuture(Acknowledge.get());
		} else {
			final String message = "TaskManager received a checkpoint request for unknown task "   executionAttemptID   '.';

			log.debug(message);
			return FutureUtils.completedExceptionally(new CheckpointException(message));
		}
	}

到 Task.triggerCheckpointBarrier

代码语言:javascript复制
// trigger operator chain trigger checkpoint  最终触发 triggerCheckpointBarrier
	public void triggerCheckpointBarrier(
		final long checkpointID,
		long checkpointTimestamp,
		final CheckpointOptions checkpointOptions) {
		
		//实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
		// source ->flatMap
		// invokable 实际上是 operator chain
		final AbstractInvokable invokable = this.invokable;
		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
		
		if (executionState == ExecutionState.RUNNING && invokable != null) {
			
			// build a local closure
			final String taskName = taskNameWithSubtask;
			final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
				FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
			
			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					// set safety net from the task's context for checkpointing thread
					LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
					FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
					
					try {
						// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
						boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
						if (!success) {
							checkpointResponder.declineCheckpoint(
								getJobID(), getExecutionId(), checkpointID,
								new CheckpointDeclineTaskNotReadyException(taskName));
						}
					} catch (Throwable t) {
						if (getExecutionState() == ExecutionState.RUNNING) {
							failExternally(new Exception(
								"Error while triggering checkpoint "   checkpointID   " for "  
									taskNameWithSubtask, t));
						} else {
							LOG.debug("Encountered error while triggering checkpoint {} for "  
									"{} ({}) while being not in state running.", checkpointID,
								taskNameWithSubtask, executionId, t);
						}
					} finally {
						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
					}
				}
			};
			executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
		} else {
			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
			
			// send back a message that we did not do the checkpoint
			checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
				new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
		}
	}

我们以 SourceStreamTask 为例,进入

代码语言:javascript复制
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
		try {
			// No alignment if we inject a checkpoint
			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
					.setBytesBufferedInAlignment(0L)
					.setAlignmentDurationNanos(0L);

			return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
		}
		catch (Exception e) {
			// propagate exceptions only if the task is still in "running" state
			if (isRunning) {
				throw new Exception("Could not perform checkpoint "   checkpointMetaData.getCheckpointId()  
					" for operator "   getName()   '.', e);
			} else {
				LOG.debug("Could not perform checkpoint {} for operator {} while the "  
					"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
				return false;
			}
		}
	}

StreamTask.performCheckpoint

代码语言:javascript复制
// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)
	private boolean performCheckpoint(
			CheckpointMetaData checkpointMetaData,
			CheckpointOptions checkpointOptions,
			CheckpointMetrics checkpointMetrics) throws Exception {

		LOG.debug("Starting checkpoint ({}) {} on task {}",
			checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

		synchronized (lock) {
			if (isRunning) {
				// we can do a checkpoint

				// All of the following steps happen as an atomic step from the perspective of barriers and
				// records/watermarks/timers/callbacks.
				// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
				// checkpoint alignments

				// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
				//           The pre-barrier work should be nothing or minimal in the common case.
				//注意,从这里开始,整个执行链路上开始出现Barrier
				operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

				// Step (2): Send the checkpoint barrier downstream
				/*
				发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint
				 */
				operatorChain.broadcastCheckpointBarrier(
						checkpointMetaData.getCheckpointId(),
						checkpointMetaData.getTimestamp(),
						checkpointOptions);

				// Step (3): Take the state snapshot. This should be largely asynchronous, to not
				//           impact progress of the streaming topology
				// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的
				// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的
				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
				return true;
			}
			else {
				// we cannot perform our checkpoint - let the downstream operators know that they
				// should not wait for any input from this operator

				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
				// yet be created
				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
				Exception exception = null;

				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
					try {
						//类似于 barrier 的另一种消息
						recordWriter.broadcastEvent(message);
					} catch (Exception e) {
						exception = ExceptionUtils.firstOrSuppressed(
							new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
							exception);
					}
				}

				if (exception != null) {
					throw exception;
				}

				return false;
			}
		}
	}

整个 Flink Graph 首次出现 checkpoint barrier。 需要注意的是主动触发 checkpoint 的只有 trigger operator( 在生成 ExecutionGraph 时会生成 trigger operator,ack operator,confirm operator,这些task 本质上是 operator chain ) ,trigger operator 我们可以简单的理解成 streamSource operator。 换言之,streamSource operator 触发了 checkpoint,一直到把 checkpoint 广播到下游,最后做 checkpoint state ( StreamSource operator 的 state )。 具体是怎么广播到下游的,其实与普通消息的传递类似,可以参考 一文搞定 Flink 消费消息的全流程

然后下游的算子 比如 flatMap 在 OneInputStreamTask ( 以此为例 ) 中消费消息

代码语言:javascript复制
@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
		//处理输入的消息
		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}

接下来,直接到 BarrierBuffer (当设置 checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 时 )

代码语言:javascript复制
@Override
	// 从 ResultSubPartition 中获取数据
	public BufferOrEvent getNextNonBlocked() throws Exception {
		while (true) {
			// process buffered BufferOrEvents before grabbing new ones
			Optional<BufferOrEvent> next;
			if (currentBuffered == null) {
				// 如果当前有堆积的 boe,直接从 InputGate 中获取,否则从缓存中获取(通过 BufferSpiller 缓存的数据)
				// 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据
				next = inputGate.getNextBufferOrEvent();
			}
			else {
				next = Optional.ofNullable(currentBuffered.getNext());
				if (!next.isPresent()) {
					completeBufferedSequence();
					return getNextNonBlocked();
				}
			}

			if (!next.isPresent()) {
				if (!endOfStream) {
					// end of input stream. stream continues with the buffered data
					endOfStream = true;
					releaseBlocksAndResetBarriers();
					return getNextNonBlocked();
				}
				else {
					// final end of both input and buffered data
					return null;
				}
			}

			BufferOrEvent bufferOrEvent = next.get();
			if (isBlocked(bufferOrEvent.getChannelIndex())) {
				// if the channel is blocked, we just store the BufferOrEvent
				//  barrier 对齐
				bufferBlocker.add(bufferOrEvent);
				checkSizeLimit();
			}
			else if (bufferOrEvent.isBuffer()) {
				return bufferOrEvent;
			}
			// 处理 barrier
			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
				if (!endOfStream) {
					// process barriers only if there is a chance of the checkpoint completing
					//包括 source 其实都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子
					processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
				}
			}
			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
				processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
			}
			else {
				if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
					processEndOfPartition();
				}
				return bufferOrEvent;
			}
		}
	}

接下来就是最为关键的逻辑 处理 barrier

代码语言:javascript复制
// 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint,
	//  如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来,
	//  如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了
	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
		final long barrierId = receivedBarrier.getId();

		// fast path for single channel cases
		if (totalNumberOfInputChannels == 1) {
			if (barrierId > currentCheckpointId) {
				// new checkpoint
				currentCheckpointId = barrierId;
				// 触发 checkpoint
				notifyCheckpoint(receivedBarrier);
			}
			return;
		}

		// -- general code path for multiple input channels --

		if (numBarriersReceived > 0) {
			// this is only true if some alignment is already progress and was not canceled

			if (barrierId == currentCheckpointId) {
				// regular case
				onBarrier(channelIndex);
			}
			else if (barrierId > currentCheckpointId) {
				// we did not complete the current checkpoint, another started before
				LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. "  
						"Skipping current checkpoint.",
					inputGate.getOwningTaskName(),
					barrierId,
					currentCheckpointId);

				// let the task know we are not completing this
				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

				// abort the current checkpoint
				releaseBlocksAndResetBarriers();

				// begin a the new checkpoint
				beginNewAlignment(barrierId, channelIndex);
			}
			else {
				// ignore trailing barrier from an earlier checkpoint (obsolete now)
				return;
			}
		}
		else if (barrierId > currentCheckpointId) {
			// first barrier of a new checkpoint
			beginNewAlignment(barrierId, channelIndex);
		}
		else {
			// either the current checkpoint was canceled (numBarriers == 0) or
			// this barrier is from an old subsumed checkpoint
			return;
		}

		// check if we have all barriers - since canceled checkpoints always have zero barriers
		// this can only happen on a non canceled checkpoint
		// barrier 对齐之后才会触发 checkpoint 
		if (numBarriersReceived   numClosedChannels == totalNumberOfInputChannels) {
			// actually trigger checkpoint
			if (LOG.isDebugEnabled()) {
				LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
					inputGate.getOwningTaskName(),
					receivedBarrier.getId(),
					receivedBarrier.getTimestamp());
			}

			releaseBlocksAndResetBarriers();
			// 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(),
			// 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的
			notifyCheckpoint(receivedBarrier);
		}
	}

最终 notifyCheckpoint 有会调用 StreamTask 的 performCheckpoint ,开始 flatMap 的 checkpoint barrier 一些列操作,比如广播 barrier,然后做自己的 checkpoint state。循环往复,直至最后。

0 人点赞