Spark sql规则执行器RuleExecutor(源码解析)

2020-10-29 09:54:52 浏览数 (1)

静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!

【本文大纲】1、前言2、Strategy3、Batch(包含一个或多个Rule及一个策略)4、batches: Seq[Batch](Batch队列)5、execute(核心方法)

前言

Spark sql通过Analyzer中 定义的rule把Parsed Logical Plan解析成 Analyzed Logical Plan;通过Optimizer定义的rule把 Analyzed Logical Plan 优化成 Optimized Logical Plan 。

下图是RuleExecutor类 的继承关系,Analyzer、Optimizer都继承了RuleExecutor。

Analyzer、Optimizer定义了一系列 rule,而RuleExecutor 定义了一个 rules 执行框架,即怎么把一批批规则应用在一个 plan 上得到一个新的 plan。

规则是怎么执行的 ?

RuleExecutor包含以下主要对象

代码语言:javascript复制
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

abstract class Strategy {
  def maxIterations: Int
  def errorOnExceed: Boolean = false
  def maxIterationsSetting: String = null
}

case object Once extends Strategy { val maxIterations = 1 }

case class FixedPoint(
  override val maxIterations: Int,
  override val errorOnExceed: Boolean = false,
  override val maxIterationsSetting: String = null) extends Strategy

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

protected def batches: Seq[Batch]

Strategy

Strategy 定义了Rule处理的迭代策略,有些Rule只用执行一次,有些需要多次直到达到某种效果。

代码语言:javascript复制
abstract class Strategy {

  //表示 Batch 最大执行次数,有可能是一次或多次。如果执行了 maxIterations 次之前达到收敛点,就停止,不再继续执行 Batch
  def maxIterations: Int

  //决定如果执行次超过了最大迭代次数是否抛出异常
  def errorOnExceed: Boolean = false

  //是大执行次数可以在配置文件里配制,这里是获取配制文件中key。
  def maxIterationsSetting: String = null
}

Strategy有两个实现类 :Once、FixedPoint

Once

once定义了只运行一次的规则,即maxIterations = 1的 Strategy

代码语言:javascript复制
case object Once extends Strategy { val maxIterations = 1 }

FixedPoint

fixedPoint 定义多于1次的迭代策略,maxIterations 通过配置文件获取。

代码语言:javascript复制
case class FixedPoint(
  override val maxIterations: Int,
  override val errorOnExceed: Boolean = false,
  override val maxIterationsSetting: String = null) extends Strategy

Analyzer 和 Optimizer 中分 别定义自己的fixedPoint,最大迭代次数,分别从

spark.sql.analyzer.maxIterations or spark.sql.optimizer.maxIterations 这两个配置参数里获取,默认100

Batch(包含一个或多个Rule及一个策略)

Batch 用来表示一组同类的规则。

每个Batch的Rule使用相同的策略(执行一次 or 达到fixedPoint),便于管理

代码语言:javascript复制
/** A batch of rules. */
protected case class Batch(
name: String, //这组同类规则的名称
strategy: Strategy,//策略
rules: Rule[TreeType]* //具体的规则 
)
 

Analyzer 和 Optimizer 中分 别定义自己的batch,比如Analyzer中 定义的【Hints】,策略用的是FixedPoint,【Hints】中包含了两个与处理【hint】相关的rule:

ResolveHints.ResolveJoinStrategyHints

ResolveHints.ResolveCoalesceHints

batches: Seq[Batch](Batch队列)

RuleExecutor 包含了一个 protected def batches: Seq[Batch] 方法,用来获取一系列 Batch(Batch队列),这些 Batch 都会在 execute 中执行。所有继承 RuleExecutor(Analyzer 和 Optimizer)都必须实现该方法,提供自己的 Seq[Batch]。

Analyzer 和 Optimizer 中 提供各自己的 batches:

Optimizer 中的batches略显复杂,Optimizer定义了 三种batches:defaultBatches、excludedRules 、 nonExcludableRules

最终要被执行的batches为:defaultBatches - (excludedRules - nonExcludableRules)

execute(核心方法)

execute方法遍历batches中的每个Batch,再用Batch中的每个Rule处理plan。

while (continue) 的终止条件:

达到最大迭代次数maxIterations 或者 当前plan和last plan相比没有变化

执行流程

源码详解

代码语言:javascript复制
//传入参数plan是当前的执行计划
def execute(plan: TreeType): TreeType = {
  var curPlan = plan
  val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
  val planChangeLogger = new PlanChangeLogger()
  val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
  val beforeMetrics = RuleExecutor.getCurrentMetrics()

  // Run the structural integrity checker against the initial input
  if (!isPlanIntegral(plan)) {
    val message = "The structural integrity of the input plan is broken in "  
      s"${this.getClass.getName.stripSuffix("$")}."
    throw new TreeNodeException(plan, message, null)
  }

  //遍历batches,取出batch
  batches.foreach { batch =>
    //针对每个batch处理
    val batchStartPlan = curPlan
    var iteration = 1 
    var lastPlan = curPlan
    var continue = true
    while (continue) {
      //foldLeft语法,形如A.foldLeft(B){(z,i)=>{dosomething() return C}}
      //curPlan是batch迭代的初始值
      //case(plan,rule) 中plan是batch中某个rule执行后返回的plan,继续投入batch中下个rule;rule是指batch中的某个rule对象
      //整个foldLeft最后返回当前batch执行完之后生成的plan
      curPlan = batch.rules.foldLeft(curPlan) {
        case (plan, rule) =>
          val startTime = System.nanoTime()
          val result = rule(plan)  //调用rule的apply方法,执行规则       
          val runTime = System.nanoTime() - startTime
          val effective = !result.fastEquals(plan)

          if (effective) {
            queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
            queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
            planChangeLogger.logRule(rule.ruleName, plan, result)
          }
          queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
          queryExecutionMetrics.incNumExecution(rule.ruleName)

          // Record timing information using QueryPlanningTracker
          tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))

          // Run the structural integrity checker against the plan after each rule.
          if (!isPlanIntegral(result)) {
            val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, "  
              "the structural integrity of the plan is broken."
            throw new TreeNodeException(result, message, null)
          }

          result
      }
      iteration  = 1 //迭代次数 1
      if (iteration > batch.strategy.maxIterations) {
         //处理迭代次数大于策略中最大迭代次的情况
        
        if (iteration != 2) {
         //这个处理策略最大迭代次数不是一次,并且迭代次数超过了maxIterations的情况
          val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
            "."
          } else {
            s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
          }
          val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"  
            s"$endingMsg"
          if (Utils.isTesting || batch.strategy.errorOnExceed) {
           //如果errorOnExceed为true则抛出异常 
            throw new TreeNodeException(curPlan, message, null)
          } else {
            //只给警告日志
            logWarning(message)
          }
        }
        // Check idempotence for Once batches.
        if (batch.strategy == Once &&
          Utils.isTesting && !blacklistedOnceBatches.contains(batch.name)) {
          checkBatchIdempotence(batch, curPlan)
        }
        //对于达到策略设置数量,continue置false,迭代将会结束
        continue = false
      }
    //如果迭代次数没达到maxIterations,但是当前plan和上次plan完全一样的话,也会把 continue 置为 false,停止迭代
      if (curPlan.fastEquals(lastPlan)) {
        logTrace(
          s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
        continue = false
      }
      lastPlan = curPlan
    }

    planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
  }
  planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)

  curPlan
}

0 人点赞