sparksql源码系列 | 一文搞懂Partitioning源码体系(spark3.2)

2022-06-09 21:36:10 浏览数 (1)

这篇文章主要介绍sparksql中Partitioning的源码体系,和上篇 sparksql源码系列 | 一文搞懂Distribution源码体系(spark3.2)一样, Partitioning也是我们理解Physical Plan、executed Plan、shuffle、SparkSQL的AQE机制等的一个比较基础的知识点。

Partitioning定义了一个物理算子输出数据的分区方式,具体包括子Partitioning之间、目标Partitioning和Distribution之间的关系。

它用在什么地方呢?

每个physical operatior实现了outputPartitioning接口,以获得一个Partitioning的实例,用于表示 operator输出数据满足的分布情况

类的依赖关系图

  • UnknownPartitioning:不进行分区
  • SinglePartition:单分区
  • RoundRobinPartitioning:在1-numPartitions范围内轮询式分区
  • BroadcastPartitioning:广播分区
  • HashPartitioning:基于哈希的分区方式
  • RangePartitioning:基于范围的分区方式
  • PartitioningCollection:分区方式的集合,描述物理算子的输出
  • DataSourcePartitioning:V2 DataSource的分区方式

Partitioning接口定义如下:

代码语言:javascript复制
trait Partitioning {
 //该sparkPlan输出RDD的分区数目
  val numPartitions: Int
 //当前的partitioning操作能否得到所需的数据分布,当不满足时返回false,对数据进行重新组织
 /** 需满足两个条件:
  * 1、分区数numPartitions要相等 
  * 2、satisfies0方法返回true,satisfies0方法中写了和Distribution的关系
  **/
  final def satisfies(required: Distribution): Boolean = {
    required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
  }

  /**
   * 1、如果requiredChildDistribution为UnspecifiedDistribution,则说明对子节点的分布没有要求,返回true
   * 2、如果requiredChildDistribution为AllTuples,则只要numPartitions == 1,返回true
   * 3、其他情况,返回false
   **/
  protected def satisfies0(required: Distribution): Boolean = required match {
    case UnspecifiedDistribution => true
    case AllTuples => numPartitions == 1
    case _ => false
  }
}

numPartitions:指定该sparkplan输出的rdd分区数目。

satisfies&satisfies0:当前的partitioning操作能否得到所需的数据分布(required)。当不满足时,一般需要进行repartition操作,对数据进行重组织。做法就是添加exchange节点

Partitioning与Distribution关系理解

1、sparkplan定义了requiredChildDistribution接口,以获得一个Distribution的实例,用于表示 operator对其input数据(child节点的输出数据)分布情况的要求

2、sparkplan定义了outputPartitioning接口,以获得一个Partitioning的实例,用于表示 operator输出数据满足的分布情况

3、Distribution定义了createPartitioning接口,用来定义该distribution对应哪种Partitioning。

代码语言:javascript复制
sealed trait Distribution {
 //分区数
  def requiredNumPartitions: Option[Int]

 //为Distribution创建默认分区,该分区可以满足此分布,同时匹配给定数量的分区。
  def createPartitioning(numPartitions: Int): Partitioning
}
代码语言:javascript复制

4、Partitioning定义了satisfies接口,用来判断当前的partitioning操作能否得到所需的数据分布,当不满足时返回false。

总结一下

SparkPlan对输入数据的分布(Distribution)情况有着一定的要求,比如HashAggregateExec类型,要求输入数据key值按照hash方式分区,如果输入数据的分布无法满足(child.outputPartitioning.satisfies(requiredChildDistributions) )当前节点的处理逻辑时,就需要添加一些shuffle操作来达到要求,体现在物理算子树上就是加Exchange节点。

决定要不要添加Exchange节点,主要是靠子节点的outputPartitioning(Partitioning)是否satisfies当前节点requiredChildDistributions(Distribution)来决定。

以上

0 人点赞