一文搞懂 Flink 中的锁

2020-12-31 11:51:08 浏览数 (1)

之前在介绍 flink timer 的时候( 一文搞懂 Flink Timer ) 官网有这样的一句话

Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

当时觉得特别奇怪,今天我们就一起来看一下,flink 是如何保证 onTimer 与 processElement 同步的以及其他使用 lock 的地方

由 一文搞定 Flink 消费消息的全流程 我们可以知道,当算子处理 msg 时,保持同步

代码语言:javascript复制
// 这里就是真正的,用户的代码即将被执行的地方
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						//处理每条 record lock
						// 所以如果是 window 由 processElement 导致的 window fire 也会被 lock 住
						synchronized (lock) {
							numRecordsIn.inc();
							//throught KeySelector set KeyContext setCurrentKey
							streamOperator.setKeyContextElement1(record);
							//处理数据
							streamOperator.processElement(record);
						}

由 一文搞懂 flink 处理水印全过程 我们可以知道下游算子处理水印时,会保持同步

代码语言:javascript复制
public void handleWatermark(Watermark watermark) {
			try {
				// event time lock
				synchronized (lock) {
					watermarkGauge.setCurrentWatermark(watermark.getTimestamp());//gauge
					//处理 watermark 的入口
					operator.processWatermark(watermark);
				}
			} catch (Exception e) {
				throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
			}
		}

这是 event timer触发的过程, 同理 process timer

代码语言:javascript复制
@Override
		public void run() {
			// process timer lock
			synchronized (lock) {
				try {
					if (serviceStatus.get() == STATUS_ALIVE) {
						target.onProcessingTime(timestamp);
					}
				} catch (Throwable t) {
					TimerException asyncException = new TimerException(t);
					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
				}
			}
		}

其中 lock 均来自于 StreamTask 的

代码语言:javascript复制
private final Object lock = new Object();

另外 lock 除了应用于 ontimer() 与 processElement() 方法外,还应用于

处理水印、处理 record、triggerCheckpoint、kafka 发送 msg、update offset

0 人点赞