总括
详解
一般情况下,我们会写如下的代码
代码语言:javascript复制DataStreamSource<Tuple2<Long, Long>> addSource = env.addSource(new WordSource());
addSource.join(addSource).where(new KeySelector<Tuple2<Long, Long>, Long>() {
@Override
public Long getKey(Tuple2<Long, Long> value) throws Exception {
// System.out.println("where " value.f0);
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
@Override
public Long getKey(Tuple2<Long, Long> value) throws Exception {
System.out.println("equalTo " value.f0);
return value.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
// System.out.println("vvvvv " first second);
return new Tuple2<>(first.f0,first.f1 second.f1);
}
})
.print("join====");
点进去可以得到 join 的入口方法
代码语言:javascript复制//join 的入口方法 otherStream 为 stream2,生成 joinedStream
public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
return new JoinedStreams<>(this, otherStream);
}
然后
代码语言:javascript复制//对 stream1 应用 keySelector
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
requireNonNull(keySelector);
final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return where(keySelector, keyType);
}
然后调用 Where 类的 equalTo 方法,保证了 stream1 stream2 相同的 key 进入到同一个窗口
代码语言:javascript复制//对 stream2 应用 keySelector 保证 stream1 和 stream2 相同的 key 或者说要关联的 key 在同一个窗口内
public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
requireNonNull(keySelector);
final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return equalTo(keySelector, otherKey);
}
再往下调用 EqualTo 类的 window 方法
代码语言:javascript复制@PublicEvolving
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
}
然后会调用 WithWindow 的 apply 方法
代码语言:javascript复制//应用 apply 方法
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
function,
JoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
input1.getType(),
input2.getType(),
"Join",
false);
return apply(function, resultType);
}
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
coGroupedWindowedStream = input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);
return coGroupedWindowedStream
.apply(new JoinCoGroupFunction<>(function), resultType);
}
至此为止,关键性的方法 apply 出现了,通过 apply 的实现,我们可以知道,join 底层是通过 coGroup 实现的,得到 coGroupedWindowedStream,其中的 function 即为我们自定义的 function.
coGroupedWindowedStream 的 apply 方法最终调用了 WindowStream 的 apply 方法
代码语言:javascript复制// 转化为 operator
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
KeySelector<T, K> keySel = input.getKeySelector();
WindowOperator<K, T, Iterable<T>, R, W> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
// 窗口中 state ttl long_max_value
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
evictor,
allowedLateness,
lateDataOutputTag);
} else {
// 窗口中 state ttl long_max_value
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
allowedLateness,
lateDataOutputTag);
}
// StreamOperator 转化为 dataStream
return input.transform(opName, resultType, operator);
}
转化为了 windowOperator。当 window 执行的时候,调用了 CoGroupWindowFunction 的 apply 方法
代码语言:javascript复制@Override
// window 在执行的时候,即 userFunction.process
public void apply(KEY key,
W window,
Iterable<TaggedUnion<T1, T2>> values,
Collector<T> out) throws Exception {
//会将两个 stream 的数据,添加到 list 当中
List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val: values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
wrappedFunction.coGroup(oneValues, twoValues, out);
}
而 wrappedFunction.coGroup 调用了 JoinCoGroupFunction.coGroup,从而实现双流 join
代码语言:javascript复制@Override
// join 最终执行的地方,其中 first、second 都是窗口中的数据
public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
for (T1 val1: first) {
for (T2 val2: second) {
//这里执行用户定义的 join 方法
out.collect(wrappedFunction.join(val1, val2));
}
}
}