从头分析flink源码第四篇之channel selector

2021-08-18 10:49:39 浏览数 (1)

selector 是做什么用的呢?我们来看下它里面的几个方法:

•void setup(int numberOfChannels):用输出通道的数量来对channel selector进行初始化操作,主要用于数据输出时使用;•selectChannel(T record):返回逻辑channel index,给定记录应写入该索引。broadcast模式的 channel selectors对应的这个方法不应该被调用,在实现时可以抛出UnsupportedOperationException。•isBroadcast() 方法:在broadcast模式下会选择所有的channel,这个方法用来标识是否是broadcast方法。

在flink的output操作时,由于可能会存在网络传输,而每个节点可能有多个partition,所以在做数据输出时需要知道往哪个分区中写入数据供下游consumer消费。channel selector的核心目标是用来解决这个问题,setup方法会使用输出通道的数量进行一些初始化操作(主要是路由算法的初始化操作),selectChannel方法用于为一条记录选择合适的channel index,而isBroadcast()方法用于判断是否为广播模式,因为广播模式下会为每个通道都发送数据。

channel selector的类继承关系如下图所示:

下面我们从when 和how两个方面来进行分析,一方面是这些selector都在哪些地方有使用到也就是when,另一方面是这些selector是怎么在flink中发挥作用的也就是how。

when

我们根据partitioner的分类来进行分析,主要分为四种大类型,即RoundRobinChannelSelector、StreamPartitioner、DataSkewChannelSelector、OutputEmitter四种,接下来我们一一分析。

RoundRobinChannelSelector

这是{@link ChannelSelector}接口的默认实现。它表示一个简单的轮循策略,即无论记录是什么,每次只选择一个输出通道。

可以看到默认使用的ChannelSelector是RoundRobinChannelSelector,当然用户也可以通过setChannelSelector方法来自定义channel selector。这个RecordWriter使用的地方在StreamTask#createRecordWriter:

关于这个RecordWriter后续我们再用专门的篇幅来介绍。

DataSkewChannelSelector

一个{@link ChannelSelector},它为几乎所有记录选择通道0。除信道0外的所有其他信道最多只能被选择一次。我们来看下它的selectChannel实现:

代码语言:javascript复制
@Override
public int selectChannel(IOReadableWritable record) {
    if (channelIndex >= numberOfChannels) {
        return 0;
    }
    return channelIndex  ;
}

该channel selector会将大多数压力都引到某个channel上,它主要用于压测channel的吞吐量情况。

OutputEmitter

主要用于之前流批api分离的时候旧的批处理的BatchTask中的RecordWriter使用的。

StreamPartitioner

在流处理任务中使用channel selector,它有很多个子类,结构如下图:

它相对于父类ChannelSelector新增了两个方法:

代码语言:javascript复制
    /**
     * Defines the behavior of this partitioner, when upstream rescaled during recovery of in-flight data.
     */
    public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
        return SubtaskStateMapper.ARBITRARY;
    }

    /**
     * Defines the behavior of this partitioner, when downstream rescaled during recovery of in-flight data.
     */
    public abstract SubtaskStateMapper getDownstreamSubtaskStateMapper();

    public abstract StreamPartitioner<T> copy();

•getUpstreamSubtaskStateMapper()方法:对正在运行的数据进行恢复时会导致上游的重新伸缩,该方法用于定义该行为的partitioner。•getDownstreamSubtaskStateMapper()方法:对正在运行的数据进行恢复时会导致下游的重新伸缩,该方法用于定义该行为的partitioner。•copy方法,返回的是当前StreamPartitioner。

关于StateMapper的使用可以参考深入理解flink可缩放状态一文。

BinaryHashPartitioner

对记录的二进制数据取hash,以hash的方式来将记录均匀分散到各个channel中。我们来简单地看一下selectChannel方法:

代码语言:javascript复制
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
        return MathUtils.murmurHash(
                getHashFunc().hashCode(record.getInstance().getValue())) % numberOfChannels;
    }

输入的是携带二进制数据的RowData,数据路由的方式是使用二进制记录值的hash来进行的。它的使用场景是在BatchExecExchange#translateToPlanInternal方法中如果RelDistribution的类型是RelDistribution.Type.HASH_DISTRIBUTED时会使用BinaryHashPartitioner来生成PartitionTransformation。

BroadcastPartitioner

直接来看下这个partitioner的实现:

代码语言:javascript复制
    /**
     * Note: Broadcast mode could be handled directly for all the output channels
     * in record writer, so it is no need to select channels via this method.
     */
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
        return SubtaskStateMapper.DISCARD_EXTRA_STATE;
    }

    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.ROUND_ROBIN;
    }

    @Override
    public boolean isBroadcast() {
        return true;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

•广播模式可以在record writer中为所有输出channel写入数据,它不需要选择channel;•上游stream的subtaskStateMapper为SubtaskStateMapper.DISCARD_EXTRA_STATE,只使用已经存在的subtask,丢弃多余的部分;•下游stream的subtaskStateMapper为SubtaskStateMapper.ROUND_ROBIN,它会以轮循方式重新分配子任务state;•isBroadcast方法返回为true,copy方法返回的是当前对象。

应用场景:可以在DataStream的broadcast()方法来指定使用BroadcastPartitioner,在sql中会在解析得到RelNode的 RelDistribution.Type为 RelDistribution.Type.BROADCAST_DISTRIBUTED类型时使用BroadcastPartitioner,详细代码在BatchExecExchange#translateToPlanInternal方法中。

ForwardPartitioner

仅将元素转发给本地运行的下游的partitioner。

代码语言:javascript复制
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.ROUND_ROBIN;
    }

我们来看下它的主要的几个方法:

•selectChannel方法返回的是0,它与GlobalPartitioner的slectChannel是一样的实现,关于区别见下文分解;•copy方法返回的当前实例;•getDownstreamSubtaskStateMapper()方法返回的是SubtaskStateMapper.ROUND_ROBIN,它会以轮循方式重新分配子任务state。

在使用ForwardPartitioner时要求上下游节点的并行度相同,在没有指定partitioner且上下游的并行度相同时会默认使用ForwardPartitioner,关于这一点我们可以过一下StreamGraph#addEdgeInternal中的一段逻辑:

代码语言:javascript复制
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);
            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            if (partitioner == null
                && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                // 默认partitioner
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner<Object>();
            }
            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow "  
                        "change of parallelism. Upstream operation: "   upstreamNode
                          " parallelism: "   upstreamNode.getParallelism()  
                        ", downstream operation: "   downstreamNode   " parallelism: "
                          downstreamNode.getParallelism()  
                        " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

在构建StreamGraph生成StreamNode时会有这样一段逻辑,代码比较清晰了,有兴趣的同学可自行分析一下。

GlobalPartitioner

将所有元素发送到子任务ID=0的下游操作符的partitioner,代码如下:

代码语言:javascript复制
@Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.FIRST;
    }

•selectChannel方法返回的是0;•copy方法返回的是当前实例;•getDownstreamSubtaskStateMapper()方法返回的是SubtaskStateMapper.FIRST,它会将所有的子任务数据都分配到下游第一个子任务。

可以通过使用DataStream的global方法来指定使用GlobalPartitioner,也会在批流处理sql任务时在转换时会在某些条件下使用GlobalPartitioner。

流处理模式下参考代码StreamExecExchange#translateToPlanInternal:

代码语言:javascript复制
 case RelDistribution.Type.SINGLETON =>
        val partitioner = new GlobalPartitioner[RowData]
        val transformation = new PartitionTransformation(
          inputTransform,
          partitioner.asInstanceOf[StreamPartitioner[RowData]])
        transformation.setOutputType(outputTypeInfo)
        transformation.setParallelism(1)
        transformation

批处理模式下参考代码BatchExecExchange#translateToPlanInternal:

代码语言:javascript复制
 case RelDistribution.Type.SINGLETON =>
        val transformation = new PartitionTransformation(
          input,
          new GlobalPartitioner[RowData],
          shuffleMode)
        transformation.setOutputType(outputRowType)
        transformation.setParallelism(1)
        transformation
KeyGroupStreamPartitioner

通过KeySelector从record中获取key,然后根据key进行hash打散再按并行度分散到不同的subTask中去。

代码语言:javascript复制
public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {
        Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
        this.keySelector = Preconditions.checkNotNull(keySelector);
        this.maxParallelism = maxParallelism;
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from "   record.getInstance().getValue(), e);
        }
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }

    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.RANGE;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "HASH";
    }

    @Override
    public void configure(int maxParallelism) {
        KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
        this.maxParallelism = maxParallelism;
    }

•构造时需要传入KeySelector和maxParallelism,也可以通过configure方法配置maxParallelism的值;•selectChannel方法中会使用keySelector去record中提取key值,然后根据key进行hash打散再按并行度分散到不同的subTask中去。大致过程为:根据key的hashCode值利用MathUtils.murmurHash方法再打散对maxParallelism取余获取keyGroupId,然后利用keyGroupId * parallelism / maxParallelism获取最终的key group index。由于keyGroupId 和parallelism 都是int值,所以maxParallelism的值必须是小于或等于 Short.MAX_VALUE,不然计算时会有精度问题。

使用场景主要是在流式任务中的KeyedStream中和sql任务CommonPhysicalSink#createSinkTransformation方法中以及StreamExecExchange#translateToPlanInternal中会使用到,具体的细节后续专门来介绍。

RebalancePartitioner

随机开始然后循环轮询的方式来分配channel,代码如下:

代码语言:javascript复制
@Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);

        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo   1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.ROUND_ROBIN;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

•初始化时会设置随机开始的位置,注意这里使用的是比较高性能的ThreadLocalRandom;•selectChannel方法中为具体的逻辑,以循环使用0~numberOfChannels区间中的索引的方式来确定channel index;•getDownstreamSubtaskStateMapper方法返回的是SubtaskStateMapper.ROUND_ROBIN,会以轮循方式重新分配子任务state;•copy方法返回的是当前实例。

使用场景:

•可以在DataStream中调用rebalance方法指定RebalancePartitioner;•在StreamGraph#addEdgeInternal方法中如果没有指定partitioner且节点上下游并行度不相等时会默认使用RebalancePartitioner作为分区器;•BatchExecExchange#translateToPlanInternal中进行sql任务解析时如果RelNode的RelDistribution.Type为RANDOM_DISTRIBUTED类型时会使用RebalancePartitioner。

RescalePartitioner

通过轮询的方式平均分配数据的partitioner,具体实现如下:

代码语言:javascript复制
private int nextChannelToSendTo = -1;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (  nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.ROUND_ROBIN;
    }

•selectChannel方法中为核心逻辑,会以自增nextChannelToSendTo的方式来平均分配数据到每个channel中去;•getDownstreamSubtaskStateMapper方法返回的是SubtaskStateMapper.ROUND_ROBIN。

使用场景:

•DataStream的rescale方法来指定使用;•在StreamingJobGraphGenerator#connect方法中会将RescalePartitioner使用POINTWISE模式:

pointwise模式下,上游操作向其下游操作子集发送元素取决于上游和下游操作的并行度。例如,如果上游操作具有并行度2,而下游操作具有并行度4,那么一个上游操作将向两个下游操作分发元素,而另一个上游操作将向另外两个下游操作分发元素。另一方面,如果下游操作具有并行度2,而上游操作具有并行度4,则两个上游操作将分配给一个下游操作,而另外两个上游操作将分配给另一个下游操作。在上下游有不同的并行度而且不是彼此的倍数的情况下,一个或多个下游操作将具有不同数量的来自上游操作的输入。

ShufflePartitioner

为数据随机选择channel,代码逻辑如下:

代码语言:javascript复制
private Random random = new Random();

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return random.nextInt(numberOfChannels);
}

@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
    return SubtaskStateMapper.ROUND_ROBIN;
}

•selectChannel方法会从channel列表中随机选择一个;•getDownstreamSubtaskStateMapper()方法返回的是SubtaskStateMapper.ROUND_ROBIN,会以轮询的方式为下游子任务分配state。

使用场景:通过DataStream的shuffle方法来指定使用ShufflePartitioner。

问题梳理

ForwardPartitioner与GlobalPartitioner的selectChannel方法实现中都是返回为0,那么它们之间的区别是什么?

区别1:如果一个节点的输出partitioner为ForwardPartitioner或RescalePartitioner,那么会在生成JobGraph建立当前节点与下游节点的连接边时指定DistributionPattern为DistributionPattern.POINTWISE,其他的partitioner对应的DistributionPattern为ALL_TO_ALL。这两种DistributionPattern的区别可以参考这篇文章[1],这里我就不再浪费时间去画图解释了。

DistributionPattern.ALL_TO_ALL 就是简单的全连接,DistributionPattern.POINTWISE会根据上下游节点的并行度来调整,当上游分区与下游ExecutionJobVertex节点的并行度相同时会一对一连接;当上游分区并行度小于下游ExecutionJobVertex节点的并行度时,下游子 task 只会连接一个上游分区;当上游分区并行度大于下游子task并行度时,子 task 会连接多个上游分区。GlobalPartitioner是all_to_all模式的,一个上游会与下游子任务全连接,所以能全局控制channel为0,而ForwardPartitioner是pointwise的。二者partition的区别代码位于ResultPartitionFactory#create方法中,由于篇幅问题,后面再专门来分析了。

区别2:getDownstreamSubtaskStateMapper()方法的实现不同,GlobalPartitioner中返回的是SubtaskStateMapper.FIRST,也就是会只恢复index为0的那个subTask;ForwardPartitioner中返回的是SubtaskStateMapper.ROUND_ROBIN,仍会考虑所有的subTask。

partitioner与DistributionPattern的区别

1.partitioner的类型会决定DistributionPattern的类型,ForwardPartitioner或RescalePartitioner对应的DistributionPattern为DistributionPattern.POINTWISE,其他的partitioner对应的DistributionPattern为ALL_TO_ALL。2.partitioner的使用场景是在RecordWriter中决定数据往哪个partition发送时:

3.DistributionPattern的使用场景是在创建ExecutionEdge来连接上游分区和下游节点时:

References

[1] 这篇文章: http://chenyuzhao.me/2017/02/06/flink物理计划生成/

0 人点赞