之前在介绍 flink timer 的时候( 一文搞懂 Flink Timer ) 官网有这样的一句话
Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.
当时觉得特别奇怪,今天我们就一起来看一下,flink 是如何保证 onTimer 与 processElement 同步的以及其他使用 lock 的地方
由 一文搞定 Flink 消费消息的全流程 我们可以知道,当算子处理 msg 时,保持同步
代码语言:javascript复制// 这里就是真正的,用户的代码即将被执行的地方
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
//处理每条 record lock
// 所以如果是 window 由 processElement 导致的 window fire 也会被 lock 住
synchronized (lock) {
numRecordsIn.inc();
//throught KeySelector set KeyContext setCurrentKey
streamOperator.setKeyContextElement1(record);
//处理数据
streamOperator.processElement(record);
}
由 一文搞懂 flink 处理水印全过程 我们可以知道下游算子处理水印时,会保持同步
代码语言:javascript复制public void handleWatermark(Watermark watermark) {
try {
// event time lock
synchronized (lock) {
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());//gauge
//处理 watermark 的入口
operator.processWatermark(watermark);
}
} catch (Exception e) {
throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
}
}
这是 event timer触发的过程, 同理 process timer
代码语言:javascript复制@Override
public void run() {
// process timer lock
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(timestamp);
}
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
}
}
}
其中 lock 均来自于 StreamTask 的
代码语言:javascript复制private final Object lock = new Object();
另外 lock 除了应用于 ontimer() 与 processElement() 方法外,还应用于
处理水印、处理 record、triggerCheckpoint、kafka 发送 msg、update offset