前言
每个应用程序都有一个hello world代码,在flink里面这个hello world一般就是一段wordcount程序,我们来尝试通过一段wordcount代码来逐步剖析flink的执行过程。毫无疑问,这将是一个系列,而且笔者没办法保证能有足够的空闲时间完成这个系列。
WordCount
我们来看一段wordcount代码如下:
代码语言:javascript复制DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0).sum(1);
counts.print();
env.execute("Streaming WordCount");
我们先来分析下这一段,看看上面几行代码执行的过程中,flink都做了哪些操作。
StreamExecutionEnvironment#fromElements方法
代码语言:javascript复制@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(data[0]);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " data[0].getClass().getName()
"; please specify the TypeInformation manually via "
"StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(Arrays.asList(data), typeInfo);
}
这里会通过TypeExtractor.getForObject方法从第一个元素中获取到数据的类型信息,其中包括属性信息和class信息(关于类型这部分,后面我们专门用一篇文章来介绍下)。fromCollection方法内部会将FromElementsFunction对象放到StreamSource内部,StreamSource就是用于数据处理的operator,对应的operatorFactory类型为SimpleUdfStreamOperatorFactory。然后利用operator对象生成LegacySourceTransformation对象,在LegacySourceTransformation内部是由StreamOperatorFactory、sourceName、outputType、并行度、数据源的格式(是否有界)等。最终会返回一个DataStreamSource对象,它的transformation引用指向的就是上面的LegacySourceTransformation对象。DataStreamSource也是DataStream类型的。
DataStream#flatMap(FlatMapFunction<T,R>)方法
代码语言:javascript复制public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return flatMap(flatMapper, outType);
}
会通过传入的flatMapper和当前数据流的类型获取这个flatmap算子的输出类型,然后在内部的flatMap方法中会将flatMapper包在StreamFlatMap这个operator中,该operator对应的operatorFactory为SimpleUdfStreamOperatorFactory类型的。进入到org.apache.flink.streaming.api.datastream.DataStream#doTransform方法中,我们来看:
代码语言:javascript复制protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 相当于一步类型校验
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
•这里会创建一个OneInputTransformation,将当前DataStream的transformation(即上文中的LegacySourceTransformation)作为这个OneInputTransformation的input,由flatMap操作产生的SimpleUdfStreamOperatorFactory(内部是StreamFlatMap算子)作为该OneInputTransformation的operatorFactory。•创建SingleOutputStreamOperator,将OneInputTransformation和任务执行上下文传入其中。•将OneInputTransformation实例放到上下文StreamExecutionEnvironment#transformations列表中。•返回SingleOutputStreamOperator,需要注意的是SingleOutputStreamOperator也是DataStream的一个子类。
总结:在这一步中将Source节点的LegacySourceTransformation作为OneInputTransformation的输入,将flatMap操作的operator对应的SimpleUdfStreamOperatorFactory也在OneInputTransformation中维护。并最终将从Source部分产生的DataStreamSource对象转换成了SingleOutputStreamOperator。
DataStream#keyBy(org.apache.flink.api.java.functions.KeySelector<T,K>)方法
代码语言:javascript复制public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key));
}
这个我们直接看它最终调用的重载的构造方法:
代码语言:javascript复制public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
this(
dataStream,
new PartitionTransformation<>(
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
keySelector,
keyType);
}
这里会将上面flatMap操作生成的OneInputTransformation作为PartitionTransformation的输入,而且需要注意的是PartitionTransformation和OneInputTransformation的一些区别:在PartitionTransformation内部主要做数据分区转换操作,所以不涉及operatorFactory和operator。同时,这里会将上文中返回的SingleOutputStreamOperator流转换成KeyedStream,而在上下文StreamExecutionEnvironment#transformations列表中维护的还是那个OneInputTransformation。
KeyedStream#sum(int)方法
代码语言:javascript复制public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
return reduce(aggregate).name("Keyed Aggregation");
}
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
ReduceTransformation<T, KEY> reduce = new ReduceTransformation<>(
"Keyed Reduce",
environment.getParallelism(),
transformation,
clean(reducer),
keySelector,
getKeyType()
);
getExecutionEnvironment().addOperator(reduce);
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), reduce);
}
•上面的三个方法均是KeyedStream的方法,我们主要关注下KeyedStream#reduce方法,在这个方法里我们依旧和上面一样,关注三个内容:function、operator、Transformation。这里的function是指ReduceFunction,和PartitionTransformation一样,不涉及operatorFactory和operator。•这里是将上面KeyedStream中的PartitionTransformation作为ReduceTransformation的input来生成一个新的transformation,reducer作为当前ReduceTransformation的ReduceFunction。•通过getExecutionEnvironment().addOperator方法将新创建的ReduceTransformation放到上下文StreamExecutionEnvironment#transformations列表中。此时transformations中有两个transformation实例,一个是上面的OneInputTransformation,另一个是刚刚创建的ReduceTransformation实例。•最后,将reduce transformation实例放到SingleOutputStreamOperator新实例中返回。这时SingleOutputStreamOperator对象中的transformation指向的是这个ReduceTransformation。我们来看下这时候数据流中维护的上下文信息,具体信息如下:
可以看到,最终在SingleOutputStreamOperator对象中维护着ReduceTransformation,它的input为OneInputTransformation...在environment的transformations列表中维护着OneInputTransformation和ReduceTransformation。
DataStream#print()方法
代码语言:javascript复制@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
这个主要需要关注下DataStream#addSink方法:
代码语言:javascript复制public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
this.transformation = (PhysicalTransformation<T>) new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Unnamed",
operator,
inputStream.getExecutionEnvironment().getParallelism());
}
这里我们依然关注以下几个内容:
•function:算子内部使用的userFunction为SinkFunction;•operator:StreamSink内容包裹着sinkFunction;•transformation:在DataStreamSink构造方法内部会新建一个LegacySinkTransformation对象,它的input是对应的inputStream中获取到的Transformation,即ReduceTransformation。它的operator是StreamSink;•会调用getExecutionEnvironment().addOperator方法将LegacySinkTransformation放到上下文StreamExecutionEnvironment#transformations列表中去;•最终返回的是DataStreamSink,它也是DataStream类型的。
StreamExecutionEnvironment#execute(java.lang.String)方法
这里进行的是依次生成streamGraph、jobGraph、executionGraph到任务创建和最终任务提交执行的过程,后续逐篇开展,本篇就先介绍到这里了。
总结
可以看到整个wordcount的过程就是将所有的operator操作包装在transformation中,根据transformation在DataStream中的顺序将transformation串联起来。不同类型的操作对应不同的function,不同的operator、不同的operatorFactory和transformation,最终返回不同类型的DataStream。这些操作都是为接下来的streamGraph的生成做好了准备。关于streamGraph的生成,请关注下一篇。