Flink getRuntimeContext().getMapState的时候发生了什么?

2019-12-20 15:51:22 浏览数 (1)

我们都知道,当使用 获取 Mapstate 的时候
代码语言:javascript复制
public void open(Configuration cfg) {
	          state = getRuntimeContext().getMapState(
	                  new MapStateDescriptor<>("sum", MyType.class, Long.class));
	      }

跟进,进入 DefaultKeyedStateStore

代码语言:javascript复制
@Override
	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
		requireNonNull(stateProperties, "The state properties must not be null");
		try {
			stateProperties.initializeSerializerUnlessSet(executionConfig);
           //关键性方法,获得到原始的 state
			MapState<UK, UV> originalState = getPartitionedState(stateProperties);
          //返回一个包装之后的 MapState
			return new UserFacingMapState<>(originalState);
		} catch (Exception e) {
			throw new RuntimeException("Error while getting state", e);
		}
	}

我们一起看一下,它是如何获取原始 state 的,跟进到 AbstractKeyedStateBackend

代码语言:javascript复制
@Override
	public <N, S extends State> S getPartitionedState(
			final N namespace,
			final TypeSerializer<N> namespaceSerializer,
			final StateDescriptor<S, ?> stateDescriptor) throws Exception {

		checkNotNull(namespace, "Namespace");

		/*
		如果 stateDescriptor name 与最新的 lastName 相同,则将最新的 state 返回
		如若第一次访问,lastName==null
		 */
		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
			lastState.setCurrentNamespace(namespace);
			return (S) lastState;
		}

		/*
		第一次 previous ==null ,再次获取直接从缓冲中返回
		 */
		InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
		if (previous != null) {
			lastState = previous;
			lastState.setCurrentNamespace(namespace);
			lastName = stateDescriptor.getName();
			return (S) previous;
		}

		//第一次会创建 对应的columnFamily,并返回相应的Rockdb State 对象 如 RocksDBListState
		final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
		final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;

		//对 lastName 赋值
		lastName = stateDescriptor.getName();
		lastState = kvState;
		kvState.setCurrentNamespace(namespace);

		return state;
	}

如果不是第一次访问,则直接从缓存中获取,若为第一个则创建。我们一起来看一下,具体的创建方法

代码语言:javascript复制
@Override
	@SuppressWarnings("unchecked")
	public <N, S extends State, V> S getOrCreateKeyedState(
			final TypeSerializer<N> namespaceSerializer,
			StateDescriptor<S, V> stateDescriptor) throws Exception {
		checkNotNull(namespaceSerializer, "Namespace serializer");
		checkNotNull(keySerializerProvider, "State key serializer has not been configured in the config. "  
				"This operation cannot use partitioned state.");

		InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
		if (kvState == null) {
			if (!stateDescriptor.isSerializerInitialized()) {
				stateDescriptor.initializeSerializerUnlessSet(executionConfig);
			}
             //创建 state
			kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
				namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
             //添加至缓存		
	        keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            //将 state 注册到相应的 task 中,具体是 task run的时候用的
			publishQueryableStateIfEnabled(stateDescriptor, kvState);
		}
		return (S) kvState;
	}

继续看一下 kvState 具体是如何创建的

代码语言:javascript复制
public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
		TypeSerializer<N> namespaceSerializer,
		StateDescriptor<S, SV> stateDesc,
		KeyedStateBackend<K> stateBackend,
		TtlTimeProvider timeProvider) throws Exception {
		Preconditions.checkNotNull(namespaceSerializer);
		Preconditions.checkNotNull(stateDesc);
		Preconditions.checkNotNull(stateBackend);
		Preconditions.checkNotNull(timeProvider);
		return  stateDesc.getTtlConfig().isEnabled() ?
			new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
				namespaceSerializer, stateDesc, stateBackend, timeProvider)
				.createState() :
			stateBackend.createInternalState(namespaceSerializer, stateDesc);
	}

咱们就以 stateBackend.createInternalState 为例,二者有很多公用的逻辑 继续跟进至 RocksDBMapState

代码语言:javascript复制
@SuppressWarnings("unchecked")
	static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
		StateDescriptor<S, SV> stateDesc,
		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
		RocksDBKeyedStateBackend<K> backend) {
		return (IS) new RocksDBMapState<>(
			registerResult.f0,
			registerResult.f1.getNamespaceSerializer(),
			(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
			(Map<UK, UV>) stateDesc.getDefaultValue(),
			backend);
	}

至此为止 RocksDBMapState 创建完成,也就是说至此,第一次调用生成的 MapState 已完成。即

代码语言:javascript复制
mapState = getRuntimeContext().getMapState(
	                  new MapStateDescriptor<>("sum", MyType.class, Long.class));

对应的 MapState 已生成,该方法调用完毕。

0 人点赞