flink源码从头分析第一篇之WordCount DataStream操作

2021-07-01 15:33:31 浏览数 (1)

前言

每个应用程序都有一个hello world代码,在flink里面这个hello world一般就是一段wordcount程序,我们来尝试通过一段wordcount代码来逐步剖析flink的执行过程。毫无疑问,这将是一个系列,而且笔者没办法保证能有足够的空闲时间完成这个系列。

WordCount

我们来看一段wordcount代码如下:

代码语言:javascript复制
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(value -> value.f0).sum(1);
counts.print();
env.execute("Streaming WordCount");

我们先来分析下这一段,看看上面几行代码执行的过程中,flink都做了哪些操作。

StreamExecutionEnvironment#fromElements方法
代码语言:javascript复制
@SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
        if (data.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }

        TypeInformation<OUT> typeInfo;
        try {
            typeInfo = TypeExtractor.getForObject(data[0]);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type "   data[0].getClass().getName()
                      "; please specify the TypeInformation manually via "
                      "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
        return fromCollection(Arrays.asList(data), typeInfo);
    }

这里会通过TypeExtractor.getForObject方法从第一个元素中获取到数据的类型信息,其中包括属性信息和class信息(关于类型这部分,后面我们专门用一篇文章来介绍下)。fromCollection方法内部会将FromElementsFunction对象放到StreamSource内部,StreamSource就是用于数据处理的operator,对应的operatorFactory类型为SimpleUdfStreamOperatorFactory。然后利用operator对象生成LegacySourceTransformation对象,在LegacySourceTransformation内部是由StreamOperatorFactory、sourceName、outputType、并行度、数据源的格式(是否有界)等。最终会返回一个DataStreamSource对象,它的transformation引用指向的就是上面的LegacySourceTransformation对象。DataStreamSource也是DataStream类型的。

DataStream#flatMap(FlatMapFunction<T,R>)方法

代码语言:javascript复制
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);

        return flatMap(flatMapper, outType);
    }

会通过传入的flatMapper和当前数据流的类型获取这个flatmap算子的输出类型,然后在内部的flatMap方法中会将flatMapper包在StreamFlatMap这个operator中,该operator对应的operatorFactory为SimpleUdfStreamOperatorFactory类型的。进入到org.apache.flink.streaming.api.datastream.DataStream#doTransform方法中,我们来看:

代码语言:javascript复制
protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 相当于一步类型校验
        transformation.getOutputType();
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operatorFactory,
                outTypeInfo,
                environment.getParallelism());
        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
        getExecutionEnvironment().addOperator(resultTransform);
        return returnStream;
    }

•这里会创建一个OneInputTransformation,将当前DataStream的transformation(即上文中的LegacySourceTransformation)作为这个OneInputTransformation的input,由flatMap操作产生的SimpleUdfStreamOperatorFactory(内部是StreamFlatMap算子)作为该OneInputTransformation的operatorFactory。•创建SingleOutputStreamOperator,将OneInputTransformation和任务执行上下文传入其中。•将OneInputTransformation实例放到上下文StreamExecutionEnvironment#transformations列表中。•返回SingleOutputStreamOperator,需要注意的是SingleOutputStreamOperator也是DataStream的一个子类。

总结:在这一步中将Source节点的LegacySourceTransformation作为OneInputTransformation的输入,将flatMap操作的operator对应的SimpleUdfStreamOperatorFactory也在OneInputTransformation中维护。并最终将从Source部分产生的DataStreamSource对象转换成了SingleOutputStreamOperator。

DataStream#keyBy(org.apache.flink.api.java.functions.KeySelector<T,K>)方法

代码语言:javascript复制
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }

这个我们直接看它最终调用的重载的构造方法:

代码语言:javascript复制
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(
            dataStream,
            new PartitionTransformation<>(
                dataStream.getTransformation(),
                new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
            keySelector,
            keyType);
    }

这里会将上面flatMap操作生成的OneInputTransformation作为PartitionTransformation的输入,而且需要注意的是PartitionTransformation和OneInputTransformation的一些区别:在PartitionTransformation内部主要做数据分区转换操作,所以不涉及operatorFactory和operator。同时,这里会将上文中返回的SingleOutputStreamOperator流转换成KeyedStream,而在上下文StreamExecutionEnvironment#transformations列表中维护的还是那个OneInputTransformation。

KeyedStream#sum(int)方法

代码语言:javascript复制
public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
    }

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        return reduce(aggregate).name("Keyed Aggregation");
    }

public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
        ReduceTransformation<T, KEY> reduce = new ReduceTransformation<>(
            "Keyed Reduce",
            environment.getParallelism(),
            transformation,
            clean(reducer),
            keySelector,
            getKeyType()
        );

        getExecutionEnvironment().addOperator(reduce);

        return new SingleOutputStreamOperator<>(getExecutionEnvironment(), reduce);
    }

•上面的三个方法均是KeyedStream的方法,我们主要关注下KeyedStream#reduce方法,在这个方法里我们依旧和上面一样,关注三个内容:function、operator、Transformation。这里的function是指ReduceFunction,和PartitionTransformation一样,不涉及operatorFactory和operator。•这里是将上面KeyedStream中的PartitionTransformation作为ReduceTransformation的input来生成一个新的transformation,reducer作为当前ReduceTransformation的ReduceFunction。•通过getExecutionEnvironment().addOperator方法将新创建的ReduceTransformation放到上下文StreamExecutionEnvironment#transformations列表中。此时transformations中有两个transformation实例,一个是上面的OneInputTransformation,另一个是刚刚创建的ReduceTransformation实例。•最后,将reduce transformation实例放到SingleOutputStreamOperator新实例中返回。这时SingleOutputStreamOperator对象中的transformation指向的是这个ReduceTransformation。我们来看下这时候数据流中维护的上下文信息,具体信息如下:

可以看到,最终在SingleOutputStreamOperator对象中维护着ReduceTransformation,它的input为OneInputTransformation...在environment的transformations列表中维护着OneInputTransformation和ReduceTransformation。

DataStream#print()方法

代码语言:javascript复制
@PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }

这个主要需要关注下DataStream#addSink方法:

代码语言:javascript复制
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();
        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }
        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
        this.transformation = (PhysicalTransformation<T>) new LegacySinkTransformation<>(
                inputStream.getTransformation(),
                "Unnamed",
                operator,
                inputStream.getExecutionEnvironment().getParallelism());
    }

这里我们依然关注以下几个内容:

•function:算子内部使用的userFunction为SinkFunction;•operator:StreamSink内容包裹着sinkFunction;•transformation:在DataStreamSink构造方法内部会新建一个LegacySinkTransformation对象,它的input是对应的inputStream中获取到的Transformation,即ReduceTransformation。它的operator是StreamSink;•会调用getExecutionEnvironment().addOperator方法将LegacySinkTransformation放到上下文StreamExecutionEnvironment#transformations列表中去;•最终返回的是DataStreamSink,它也是DataStream类型的。

StreamExecutionEnvironment#execute(java.lang.String)方法

这里进行的是依次生成streamGraph、jobGraph、executionGraph到任务创建和最终任务提交执行的过程,后续逐篇开展,本篇就先介绍到这里了。

总结

可以看到整个wordcount的过程就是将所有的operator操作包装在transformation中,根据transformation在DataStream中的顺序将transformation串联起来。不同类型的操作对应不同的function,不同的operator、不同的operatorFactory和transformation,最终返回不同类型的DataStream。这些操作都是为接下来的streamGraph的生成做好了准备。关于streamGraph的生成,请关注下一篇。

0 人点赞