一文搞懂 Flink Stream Join原理

2020-12-02 14:55:26 浏览数 (1)

总括
详解

一般情况下,我们会写如下的代码

代码语言:javascript复制
DataStreamSource<Tuple2<Long, Long>> addSource = env.addSource(new WordSource());
		
		addSource.join(addSource).where(new KeySelector<Tuple2<Long, Long>, Long>() {
			@Override
			public Long getKey(Tuple2<Long, Long> value) throws Exception {
//				System.out.println("where " value.f0);
				return value.f0;
			}
		}).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
			@Override
			public Long getKey(Tuple2<Long, Long> value) throws Exception {
				System.out.println("equalTo " value.f0);
				return value.f0;
			}
		}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
			.apply(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
				@Override
				public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
//						System.out.println("vvvvv " first second);
					return new Tuple2<>(first.f0,first.f1 second.f1);
				}
			})
			.print("join====");

点进去可以得到 join 的入口方法

代码语言:javascript复制
//join 的入口方法  otherStream 为 stream2,生成 joinedStream
	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
		return new JoinedStreams<>(this, otherStream);
	}

然后

代码语言:javascript复制
//对 stream1 应用 keySelector
	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
		requireNonNull(keySelector);
		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
		return where(keySelector, keyType);
	}

然后调用 Where 类的 equalTo 方法,保证了 stream1 stream2 相同的 key 进入到同一个窗口

代码语言:javascript复制
//对 stream2 应用 keySelector 保证 stream1 和 stream2 相同的 key 或者说要关联的 key 在同一个窗口内
		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
			requireNonNull(keySelector);
			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
			return equalTo(keySelector, otherKey);
		}

再往下调用 EqualTo 类的 window 方法

代码语言:javascript复制
@PublicEvolving
			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
			}

然后会调用 WithWindow 的 apply 方法

代码语言:javascript复制
//应用 apply 方法
		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				JoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				input1.getType(),
				input2.getType(),
				"Join",
				false);

			return apply(function, resultType);
		}

public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new JoinCoGroupFunction<>(function), resultType);
		}

至此为止,关键性的方法 apply 出现了,通过 apply 的实现,我们可以知道,join 底层是通过 coGroup 实现的,得到 coGroupedWindowedStream,其中的 function 即为我们自定义的 function.

coGroupedWindowedStream 的 apply 方法最终调用了 WindowStream 的 apply 方法

代码语言:javascript复制
// 转化为 operator
	private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

		final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
		KeySelector<T, K> keySel = input.getKeySelector();

		WindowOperator<K, T, Iterable<T>, R, W> operator;

		if (evictor != null) {
			@SuppressWarnings({"unchecked", "rawtypes"})
			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
			
			// 窗口中 state ttl long_max_value
			ListStateDescriptor<StreamRecord<T>> stateDesc =
					new ListStateDescriptor<>("window-contents", streamRecordSerializer);

			operator =
				new EvictingWindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					function,
					trigger,
					evictor,
					allowedLateness,
					lateDataOutputTag);

		} else {
			// 窗口中 state ttl long_max_value
			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
				input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			operator =
				new WindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					function,
					trigger,
					allowedLateness,
					lateDataOutputTag);
		}

		// StreamOperator 转化为 dataStream
		return input.transform(opName, resultType, operator);
	}

转化为了 windowOperator。当 window 执行的时候,调用了 CoGroupWindowFunction 的 apply 方法

代码语言:javascript复制
@Override
		// window 在执行的时候,即 userFunction.process
		public void apply(KEY key,
				W window,
				Iterable<TaggedUnion<T1, T2>> values,
				Collector<T> out) throws Exception {
			//会将两个 stream 的数据,添加到 list 当中
			List<T1> oneValues = new ArrayList<>();
			List<T2> twoValues = new ArrayList<>();

			for (TaggedUnion<T1, T2> val: values) {
				if (val.isOne()) {
					oneValues.add(val.getOne());
				} else {
					twoValues.add(val.getTwo());
				}
			}
			wrappedFunction.coGroup(oneValues, twoValues, out);
		}

而 wrappedFunction.coGroup 调用了 JoinCoGroupFunction.coGroup,从而实现双流 join

代码语言:javascript复制
@Override
		// join 最终执行的地方,其中 first、second 都是窗口中的数据
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					//这里执行用户定义的 join 方法
					out.collect(wrappedFunction.join(val1, val2));
				}
			}
		}

0 人点赞