这篇文章主要介绍sparksql中Distribution的源码体系,Distribution是我们理解Physical Plan、executed Plan、shuffle、SparkSQL的AQE机制等的一个比较基础的知识点。
Distribution定义了查询执行时,同一个表达式下的不同数据元组(Tuple)在集群各个节点上的分布情况。
它用在什么地方呢?
每个physical operator都实现了requiredChildDistribution方法,以获得一个Distribution的实例,用于表示 operator对其input数据分布情况的要求。
类的依赖关系图
我们挨个来解释
1、UnspecifiedDistribution
未指定分布,无需确定数据无组之间的位置关系 。
代码语言:javascript复制case object UnspecifiedDistribution extends Distribution {
override def requiredNumPartitions: Option[Int] = None
override def createPartitioning(numPartitions: Int): Partitioning = {
throw new IllegalStateException("UnspecifiedDistribution does not have default partitioning.")
}
}
这个是指啥?
我们知道Distribution是physical operator 用于表示operator对其input数据(child节点的输出数据)分布情况的要求,那UnspecifiedDistribution的意思就是对Child的分区规则没有要求,无所谓,你啥样都行
比如:
代码语言:javascript复制select a,count(b) from testdata2 group by a
== Physical Plan ==
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
- HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
- Scan[obj#2]
SerializeFromObject节点,
它的requiredChildDistribution就是UnspecifiedDistribution:
代码语言:javascript复制SerializeFromObject-SerializeFromObjectExec
requiredChildDistribution:List(UnspecifiedDistribution)
requiredChildOrdering:List(List())
outputPartitioning:UnknownPartitioning(0)
2、BroadcastDistribution
广播分布,数据会被广播到所有节点上。构造参数mode为广播模式BroadcastMode,广播模式可以为原始数据IdentityBroadcastMode或转换为HashedRelation对象HashedRelationBroadcastMode。
代码语言:javascript复制case class BroadcastDistribution(mode: BroadcastMode) extends Distribution {
override def requiredNumPartitions: Option[Int] = Some(1)
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(numPartitions == 1,
"The default partitioning of BroadcastDistribution can only have 1 partition.")
BroadcastPartitioning(mode)
}
}
以 BroadcastHashJoinExec 为例:
如果是Broadcast类型的Join操作假设左表做广播,那么requiredChildDistribution得到的列表就是[BroadcastDistribution(mode),UnspecifiedDistribution],表示左表为广播分布;
如果是Broadcast类型的Join操作假设右表做广播,那么requiredChildDistribution得到的列表就是[UnspecifiedDistribution,BroadcastDistribution(mode)],表示右表为广播分布;
3、OrderedDistribution
构造参数ordering是seq[SortOrder]类型该,分布意味着数据元组会根据ordering计算后的结果排序。
代码语言:javascript复制case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
require(
ordering != Nil,
"The ordering expressions of an OrderedDistribution should not be Nil. "
"An AllTuples should be used to represent a distribution that only has "
"a single partition.")
override def requiredNumPartitions: Option[Int] = None
override def createPartitioning(numPartitions: Int): Partitioning = {
RangePartitioning(ordering, numPartitions)
}
}
以 SortExec 为例:
在全局排序的sort算子中,requiredChildDistribution得到的列表是[OrderedDistribution(sortOrder)],其中sortOrder是排序表达式,相同的数据ordering计算结果相同因此能够保持连续性并被划分到相同分区中
4、AllTuples
只有一个分区,所有的数据元组存放在一起
代码语言:javascript复制case object AllTuples extends Distribution {
override def requiredNumPartitions: Option[Int] = Some(1)
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(numPartitions == 1, "The default partitioning of AllTuples can only have 1 partition.")
SinglePartition
}
}
以 GlobalLimitExec 为例:
选取全局前K条数据的GlobalLimit算子,requiredChildDistribution得到的列表就是AllTuples,表示执行该算子需要全部的数据参与
5、ClusteredDistribution
构造参数clustering是Seq[Expression]类型,起到了hash函数的效果,数据经过clustering计算后,相同value的数据元组会被存放在一起。如果有多个分区的情况,则相同的数据会被存放在同一个分区中;如果只能是单个分区,则相同的数据会在分区内连续存放。
代码语言:javascript复制case class ClusteredDistribution(
clustering: Seq[Expression],
requiredNumPartitions: Option[Int] = None) extends Distribution {
require(
clustering != Nil,
"The clustering expressions of a ClusteredDistribution should not be Nil. "
"An AllTuples should be used to represent a distribution that only has "
"a single partition.")
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but "
s"the actual number of partitions is $numPartitions.")
HashPartitioning(clustering, numPartitions)
}
}
以 HashAggregateExec 为例:
HashAggregateExec 沿用父类BaseAggregateExec 的requiredChildDistribution 方法 ,其执行的前提是“所有具有相同aggregation key的record放到同一个处理单元中”。
在Spark中,这样的处理单元就是RDD的一个partition,因此也就是要满足“所有group by 的column具有相同value的record被分配到RDD的同一个partition中”。
HashAggregateExec的requiredChildDistribution就是ClusteredDistribution。
6、HashClusteredDistribution
HashClusteredDistribution与ClusteredDistribution类似,构造参数expressions是Seq[Expression]类型,起到了hash函数的效果。
但是比ClusteredDistribution更严格,不仅保证具有相同key的record被分配到同一个partition内,而且保证了对每一个key分配到的partition id也都是确定的。
代码语言:javascript复制case class HashClusteredDistribution(
expressions: Seq[Expression],
requiredNumPartitions: Option[Int] = None) extends Distribution {
require(
expressions != Nil,
"The expressions for hash of a HashClusteredDistribution should not be Nil. "
"An AllTuples should be used to represent a distribution that only has "
"a single partition.")
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
s"This HashClusteredDistribution requires ${requiredNumPartitions.get} partitions, but "
s"the actual number of partitions is $numPartitions.")
HashPartitioning(expressions, numPartitions)
}
}
以 SortMergeJoinExec 为例:
在Spark的实现里,SortMergeJoinExec的实现简单来说就是把join两边的RDD中具有相同id的partition zip到一起进行关联。
ClusteredDistribution保证的是具有相同key的record能聚集到同一个partition中,但对join来说这样还不够。
在RDD1中假设join key为1的record分配到了partition 0,那么如果RDD1和RDD2要进行join,则RDD2中所有join key为1的record也必须分配到partition 0中。Spark通过在左右两边的shuffle中使用相同的hash函数和shuffle partition number来保证这一点。
SortMergeJoinExec对join两边的requiredChildDistribution列表是
[HashClusteredDistribution(leftKeys),HashClusteredDistribution(rightKeys)],表示左表数据根据leftKey表达式计算分区,右表数据根据rightKeys表达式计算分区。
以上