一文搞懂 flink state key 的设置方式

2020-05-29 15:49:10 浏览数 (1)

  • 1. 疑问
  • 2. 解释

1. 疑问

前一篇文章 一文搞懂 Flink window 元素的顺序问题 我们已经知道了,state 的获取、更新、清除等都与 key 相关。那么 key 是如何设置的呢?

2.解释

这需要从 StreamTask 的 run 方法说起。以 OneInputStreamTask 为例,当程序启动开始消费消息时,会进行 OneInputStreamTask 的 run 方法,

代码语言:javascript复制
@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
		//处理输入的消息
		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}

最终调用的是 inputProcessor.processInput() 方法,除了生成 watermark 之外,就是往下游发送记录

代码语言:javascript复制
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							//set KeyContext setCurrentKey
			//设置 keyContext (提供了用来查询和设置 keyed operation 的 current key 的接口)
							streamOperator.setKeyContextElement1(record);
			//这里开始调用用户自己的代码
							streamOperator.processElement(record);
						}

一路追踪下去,到 AbstractStreamOperator

代码语言:javascript复制
//自定义的 KeySelector 在此处起作用
	private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
		if (selector != null) {
			Object key = selector.getKey(record.getValue());
			setCurrentKey(key);
		}
	}

	@SuppressWarnings({"unchecked", "rawtypes"})
	public void setCurrentKey(Object key) {
		if (keyedStateBackend != null) {
			try {
				// need to work around type restrictions
				@SuppressWarnings("unchecked,rawtypes")
				AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
				
				//设置 keyedStateBackend currentKey
				rawBackend.setCurrentKey(key);
			} catch (Exception e) {
				throw new RuntimeException("Exception occurred while setting the current key context.", e);
			}
		}
	}

然后会调用 RocksDBKeyedStateBackend

代码语言:javascript复制
@Override
	public void setCurrentKey(K newKey) {
		super.setCurrentKey(newKey);
		// 每个 key 都会调用一次 将 key group and key 写入 byte[] 中,每次开始写入前都会清空,后续 state 的操作都会从这个 byte[] 中读
		sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
	}

当查询、更新以及清除 state 时,由 一文搞懂 Flink window 元素的顺序问题 我们可以知道 ,有一个 serializeCurrentKeyWithGroupAndNamespace() 方法,最终进入 buildCompositeKeyNamespace

代码语言:javascript复制
@Nonnull
	public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
		try {
			// 每次真正操作时候,serializeNamespace
			serializeNamespace(namespace, namespaceSerializer);
			// 将已序列化的 key_group,key,namespace 作为一个整体 copy 出来,这也就是 state key
			final byte[] result = keyOutView.getCopyOfBuffer();
			// 重置,类似于重置游标,去除 namespace bytes
			resetToKey();
			return result;
		} catch (IOException shouldNeverHappen) {
			throw new FlinkRuntimeException(shouldNeverHappen);
		}
	}

至此 state key 就设置完成了,然后就可以按照新设置的 key 进行查询了。

0 人点赞