这篇文章主要介绍sparksql中Partitioning的源码体系,和上篇 sparksql源码系列 | 一文搞懂Distribution源码体系(spark3.2)一样, Partitioning也是我们理解Physical Plan、executed Plan、shuffle、SparkSQL的AQE机制等的一个比较基础的知识点。
Partitioning定义了一个物理算子输出数据的分区方式,具体包括子Partitioning之间、目标Partitioning和Distribution之间的关系。
它用在什么地方呢?
每个physical operatior实现了outputPartitioning接口,以获得一个Partitioning的实例,用于表示 operator输出数据满足的分布情况。
类的依赖关系图
- UnknownPartitioning:不进行分区
- SinglePartition:单分区
- RoundRobinPartitioning:在1-numPartitions范围内轮询式分区
- BroadcastPartitioning:广播分区
- HashPartitioning:基于哈希的分区方式
- RangePartitioning:基于范围的分区方式
- PartitioningCollection:分区方式的集合,描述物理算子的输出
- DataSourcePartitioning:V2 DataSource的分区方式
Partitioning接口定义如下:
代码语言:javascript复制trait Partitioning {
//该sparkPlan输出RDD的分区数目
val numPartitions: Int
//当前的partitioning操作能否得到所需的数据分布,当不满足时返回false,对数据进行重新组织
/** 需满足两个条件:
* 1、分区数numPartitions要相等
* 2、satisfies0方法返回true,satisfies0方法中写了和Distribution的关系
**/
final def satisfies(required: Distribution): Boolean = {
required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
}
/**
* 1、如果requiredChildDistribution为UnspecifiedDistribution,则说明对子节点的分布没有要求,返回true
* 2、如果requiredChildDistribution为AllTuples,则只要numPartitions == 1,返回true
* 3、其他情况,返回false
**/
protected def satisfies0(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case AllTuples => numPartitions == 1
case _ => false
}
}
numPartitions:指定该sparkplan输出的rdd分区数目。
satisfies&satisfies0:当前的partitioning操作能否得到所需的数据分布(required)。当不满足时,一般需要进行repartition操作,对数据进行重组织。做法就是添加exchange节点
Partitioning与Distribution关系理解
1、sparkplan定义了requiredChildDistribution接口,以获得一个Distribution的实例,用于表示 operator对其input数据(child节点的输出数据)分布情况的要求。
2、sparkplan定义了outputPartitioning接口,以获得一个Partitioning的实例,用于表示 operator输出数据满足的分布情况。
3、Distribution定义了createPartitioning接口,用来定义该distribution对应哪种Partitioning。
代码语言:javascript复制sealed trait Distribution {
//分区数
def requiredNumPartitions: Option[Int]
//为Distribution创建默认分区,该分区可以满足此分布,同时匹配给定数量的分区。
def createPartitioning(numPartitions: Int): Partitioning
}
代码语言:javascript复制
4、Partitioning定义了satisfies接口,用来判断当前的partitioning操作能否得到所需的数据分布,当不满足时返回false。
总结一下
SparkPlan对输入数据的分布(Distribution)情况有着一定的要求,比如HashAggregateExec类型,要求输入数据key值按照hash方式分区,如果输入数据的分布无法满足(child.outputPartitioning.satisfies(requiredChildDistributions) )当前节点的处理逻辑时,就需要添加一些shuffle操作来达到要求,体现在物理算子树上就是加Exchange节点。
决定要不要添加Exchange节点,主要是靠子节点的outputPartitioning(Partitioning)是否satisfies当前节点requiredChildDistributions(Distribution)来决定。
以上