静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!
【本文大纲】1、前言2、Strategy3、Batch(包含一个或多个Rule及一个策略)4、batches: Seq[Batch](Batch队列)5、execute(核心方法) |
---|
前言
Spark sql通过Analyzer中 定义的rule把Parsed Logical Plan解析成 Analyzed Logical Plan;通过Optimizer定义的rule把 Analyzed Logical Plan 优化成 Optimized Logical Plan 。
下图是RuleExecutor类 的继承关系,Analyzer、Optimizer都继承了RuleExecutor。
Analyzer、Optimizer定义了一系列 rule,而RuleExecutor 定义了一个 rules 执行框架,即怎么把一批批规则应用在一个 plan 上得到一个新的 plan。
规则是怎么执行的 ?
RuleExecutor包含以下主要对象
代码语言:javascript复制abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
abstract class Strategy {
def maxIterations: Int
def errorOnExceed: Boolean = false
def maxIterationsSetting: String = null
}
case object Once extends Strategy { val maxIterations = 1 }
case class FixedPoint(
override val maxIterations: Int,
override val errorOnExceed: Boolean = false,
override val maxIterationsSetting: String = null) extends Strategy
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
protected def batches: Seq[Batch]
Strategy
Strategy 定义了Rule处理的迭代策略,有些Rule只用执行一次,有些需要多次直到达到某种效果。
代码语言:javascript复制abstract class Strategy {
//表示 Batch 最大执行次数,有可能是一次或多次。如果执行了 maxIterations 次之前达到收敛点,就停止,不再继续执行 Batch
def maxIterations: Int
//决定如果执行次超过了最大迭代次数是否抛出异常
def errorOnExceed: Boolean = false
//是大执行次数可以在配置文件里配制,这里是获取配制文件中key。
def maxIterationsSetting: String = null
}
Strategy有两个实现类 :Once、FixedPoint
Once
once定义了只运行一次的规则,即maxIterations = 1的 Strategy
代码语言:javascript复制case object Once extends Strategy { val maxIterations = 1 }
FixedPoint
fixedPoint 定义多于1次的迭代策略,maxIterations 通过配置文件获取。
代码语言:javascript复制case class FixedPoint(
override val maxIterations: Int,
override val errorOnExceed: Boolean = false,
override val maxIterationsSetting: String = null) extends Strategy
Analyzer 和 Optimizer 中分 别定义自己的fixedPoint,最大迭代次数,分别从
spark.sql.analyzer.maxIterations or spark.sql.optimizer.maxIterations 这两个配置参数里获取,默认100
Batch(包含一个或多个Rule及一个策略)
Batch 用来表示一组同类的规则。
每个Batch的Rule使用相同的策略(执行一次 or 达到fixedPoint),便于管理
代码语言:javascript复制/** A batch of rules. */
protected case class Batch(
name: String, //这组同类规则的名称
strategy: Strategy,//策略
rules: Rule[TreeType]* //具体的规则
)
Analyzer 和 Optimizer 中分 别定义自己的batch,比如Analyzer中 定义的【Hints】,策略用的是FixedPoint,【Hints】中包含了两个与处理【hint】相关的rule:
ResolveHints.ResolveJoinStrategyHints
ResolveHints.ResolveCoalesceHints
batches: Seq[Batch](Batch队列)
RuleExecutor 包含了一个 protected def batches: Seq[Batch] 方法,用来获取一系列 Batch(Batch队列),这些 Batch 都会在 execute 中执行。所有继承 RuleExecutor(Analyzer 和 Optimizer)都必须实现该方法,提供自己的 Seq[Batch]。
Analyzer 和 Optimizer 中 提供各自己的 batches:
Optimizer 中的batches略显复杂,Optimizer定义了 三种batches:defaultBatches、excludedRules 、 nonExcludableRules
最终要被执行的batches为:defaultBatches - (excludedRules - nonExcludableRules)
execute(核心方法)
execute方法遍历batches中的每个Batch,再用Batch中的每个Rule处理plan。
while (continue) 的终止条件:
达到最大迭代次数maxIterations 或者 当前plan和last plan相比没有变化
执行流程
源码详解
代码语言:javascript复制//传入参数plan是当前的执行计划
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
val beforeMetrics = RuleExecutor.getCurrentMetrics()
// Run the structural integrity checker against the initial input
if (!isPlanIntegral(plan)) {
val message = "The structural integrity of the input plan is broken in "
s"${this.getClass.getName.stripSuffix("$")}."
throw new TreeNodeException(plan, message, null)
}
//遍历batches,取出batch
batches.foreach { batch =>
//针对每个batch处理
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
while (continue) {
//foldLeft语法,形如A.foldLeft(B){(z,i)=>{dosomething() return C}}
//curPlan是batch迭代的初始值
//case(plan,rule) 中plan是batch中某个rule执行后返回的plan,继续投入batch中下个rule;rule是指batch中的某个rule对象
//整个foldLeft最后返回当前batch执行完之后生成的plan
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan) //调用rule的apply方法,执行规则
val runTime = System.nanoTime() - startTime
val effective = !result.fastEquals(plan)
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
planChangeLogger.logRule(rule.ruleName, plan, result)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
// Record timing information using QueryPlanningTracker
tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, "
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}
result
}
iteration = 1 //迭代次数 1
if (iteration > batch.strategy.maxIterations) {
//处理迭代次数大于策略中最大迭代次的情况
if (iteration != 2) {
//这个处理策略最大迭代次数不是一次,并且迭代次数超过了maxIterations的情况
val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
"."
} else {
s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
}
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
s"$endingMsg"
if (Utils.isTesting || batch.strategy.errorOnExceed) {
//如果errorOnExceed为true则抛出异常
throw new TreeNodeException(curPlan, message, null)
} else {
//只给警告日志
logWarning(message)
}
}
// Check idempotence for Once batches.
if (batch.strategy == Once &&
Utils.isTesting && !blacklistedOnceBatches.contains(batch.name)) {
checkBatchIdempotence(batch, curPlan)
}
//对于达到策略设置数量,continue置false,迭代将会结束
continue = false
}
//如果迭代次数没达到maxIterations,但是当前plan和上次plan完全一样的话,也会把 continue 置为 false,停止迭代
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
}
planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
curPlan
}