spark sql解析过程中对tree的遍历(源码详解)

2020-10-29 09:54:14 浏览数 (1)

静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!

【本文大纲】1、执行计划回顾2、遍历过程概述3、遍历过程详解4、思考小问题

执行计划回顾

Spark sql执行计划的生成过程:

  1. 接收 sql 语句,初步解析成 logical plan
  2. 分析上步生成的 logical plan,生成验证后的 logical plan
  3. 对分析过后的 logical plan,进行优化
  4. 对优化过后的 logical plan,生成 physical plan
  5. 根据 physical plan,生成 rdd 的程序,并且提交运行
代码语言:javascript复制
SELECT A,B FROM TESTDATA2 WHERE A>2

结合上图,写测试用例,每一步生成的执行计划如下:

Spark sql解析会生成四种plan:

Parsed Logical Plan, Analyzed Logical Plan, Optimized Logical Plan, Physical Plan

上面这四种plan,无论是 LogicalPlan 还是 PhysicalPlan,都是通过树的形式表示。每一步都是对树进行操作,生成新的树。在这个过程中,对树的遍历非常重要。

遍历过程概述

最常用到的有 后序遍历 和 前序遍历 两种

后序遍历

TreeNode 中的 transformUp方法以及AnalysisHelper 中的 resolveOperatorsUp方法 等

这两个方法类似,以TreeNode 中的 transformUp为例:

代码语言:javascript复制
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  // 先遍历子节点,得到叶子节点
  val afterRuleOnChildren = mapChildren(_.transformUp(rule))
 
 //对节点执行规则
  val newNode = if (this fastEquals afterRuleOnChildren) {
    CurrentOrigin.withOrigin(origin) {
      //这里用到了PartialFunction的applyOrElse方法,用来避免undefined的情况发生。如果当前节点应用rule没有匹配的话,则返回默认的当前节点本身
      rule.applyOrElse(this, identity[BaseType])
    }
  } else {
    CurrentOrigin.withOrigin(origin) {
      rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
    }
  }
  // If the transform function replaces this node with a new one, carry over the tags.
  newNode.copyTagsFrom(this)
  newNode
}

递归逻辑:

  • 递归结束条件:如果是子节点,那么使用该规则执行该节点,并且返回执行规则后的节点
  • 递归继续条件:如果有子节点,那么先根据遍历子节点的结果,生成新节点。最后在使用该规则执行新节点

前序遍历

TreeNode 中的 transformDown方法以及AnalysisHelper 中的 resolveOperatorsDown方法 等

TreeNode 中的 transformDown为例:

代码语言:javascript复制
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
      // 对当前节点,调用rule函数。
    val afterRule = CurrentOrigin.withOrigin(origin) {
    // 这里rule函数有可能会生成新的节点,新节点的子节点可能不一样
    rule.applyOrElse(this, identity[BaseType])
  }

  // Check if unchanged and then possibly return old copy to avoid gc churn.
 //再遍历子节点 
 if (this fastEquals afterRule) {
// 如果当前节点没有变化,则继续遍历它的子节点
    mapChildren(_.transformDown(rule))
  } else {
  
// 如果当前节点发生改变,需要对改变后的节点进行遍历
    afterRule.copyTagsFrom(this)
    afterRule.mapChildren(_.transformDown(rule))
  }
}

递归逻辑:

  • 递归结束条件:如果是叶子节点,那么使用规则对该节点操作,并且返回操作后的节点。
  • 递归继续条件:如果不是叶子节点,那么先使用该规则对该节点操作。对操作后的该节点,继续遍历其子节点,用子节点的返回结果,来构建成新的节点。

遍历中的通用方法

上面几种方法中,都用到了TreeNode中的mapChildren、mapProductIterator方法

mapChildren

mapChildren 会依次调用函数对子节点操作,根据返回的结果生成一个新的节点。

代码语言:javascript复制
def mapChildren(f: BaseType => BaseType): BaseType = {
//如果不是叶子节点,那么会执行mapChildren(f, forceCopy = false)方法,遍历构造函数的参数。如果参数是子节点,那么递归遍历
if (containsChild.nonEmpty) {
    mapChildren(f, forceCopy = false)
  } else {
//如果是叶子节点,则返回自身节点 
    this
  }
}


private def mapChildren(
    f: BaseType => BaseType,
    forceCopy: Boolean): BaseType = {
  var changed = false

  def mapChild(child: Any): Any = child match {
    case arg: TreeNode[_] if containsChild(arg) =>
      val newChild = f(arg.asInstanceOf[BaseType])
      if (forceCopy || !(newChild fastEquals arg)) {
        changed = true
        newChild
      } else {
        arg
      }
    case tuple @ (arg1: TreeNode[_], arg2: TreeNode[_]) =>
      val newChild1 = if (containsChild(arg1)) {
        f(arg1.asInstanceOf[BaseType])
      } else {
        arg1.asInstanceOf[BaseType]
      }

      val newChild2 = if (containsChild(arg2)) {
        f(arg2.asInstanceOf[BaseType])
      } else {
        arg2.asInstanceOf[BaseType]
      }

      if (forceCopy || !(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
        changed = true
        (newChild1, newChild2)
      } else {
        tuple
      }
    case other => other
  }

// 调用了mapProductIterator方法,遍历构造函数的参数,返回新的构造参数
  val newArgs = mapProductIterator {
// 如果参数是TreeNode子类,并且是当前节点的子节点
    case arg: TreeNode[_] if containsChild(arg) =>
// 递归调用函数遍历 这里的f可能是 transformUp or transformDown
      val newChild = f(arg.asInstanceOf[BaseType])
// 如果子节点发生变化了,则更改changed标识
      if (forceCopy || !(newChild fastEquals arg)) {
        changed = true
        newChild
      } else {
        arg
      }
    case Some(arg: TreeNode[_]) if containsChild(arg) =>
      val newChild = f(arg.asInstanceOf[BaseType])
      if (forceCopy || !(newChild fastEquals arg)) {
        changed = true
        Some(newChild)
      } else {
        Some(arg)
      }
    case m: Map[_, _] => m.mapValues {
      case arg: TreeNode[_] if containsChild(arg) =>
        val newChild = f(arg.asInstanceOf[BaseType])
        if (forceCopy || !(newChild fastEquals arg)) {
          changed = true
          newChild
        } else {
          arg
        }
      case other => other
    }.view.force // `mapValues` is lazy and we need to force it to materialize
    case d: DataType => d // Avoid unpacking Structs
    case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
    case args: Iterable[_] => args.map(mapChild)
    case nonChild: AnyRef => nonChild
    case null => null
  }
 // 如果子节点发生变化,则利用新的构造参数,实例化新的节点
  if (forceCopy || changed) makeCopy(newArgs, forceCopy) else this
}

mapProductIterator

TreeNode 继承了 Product 接口,TreeNode 的子类实现了 Product 接口,所以支持访问构造方法的参数。TreeNode 的 mapProductIterator 方法,接收一个函数用来遍历当前节点的构造参数

这里有一个知识点(ClassTag用法):https://dzone.com/articles/scala-classtag-a-simple-use-case

代码语言:javascript复制
//ClassTag用法
 def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
// protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
  val arr = Array.ofDim[B](productArity)
  var i = 0
  while (i < arr.length) {
    arr(i) = f(productElement(i))
    i  = 1
  }
  arr
}

遍历过程详解

下面以Parsed Logical Plan --> Analyzed Logical Plan的过程中 ,某个规则为例,详细跟踪一下这两种遍历方式。

分析一下当前的Parsed Logical Plan

当前sql

代码语言:javascript复制
SELECT A,B FROM TESTDATA2 WHERE A>2

生成的Parsed Logical Plan:

代码语言:javascript复制
== Parsed Logical Plan ==
'Project ['A, 'B]
 - 'Filter ('A > 2)
    - 'UnresolvedRelation [TESTDATA2]

上面执行计划涉及到 三个类(Project、Filter、UnresolvedRelation):

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)

两个参数:

  • Project-projectList: Seq['A, 'B]
  • Project-child(LogicalPlan):
  • 'Filter ('A > 2)
  • - 'UnresolvedRelation [TESTDATA2]

Project有一个子节点Filter

case class Filter(condition: Expression, child: LogicalPlan)

两个参数:

  • Filter-condition:('A > 2)
  • Filter--child(LogicalPlan):

'UnresolvedRelation [TESTDATA2]

Filter有一个子节点UnresolvedRelation

case class UnresolvedRelation( multipartIdentifier: Seq[String])

一个参数:

  • UnresolvedRelation-multipartIdentifier: Seq[TESTDATA2]

UnresolvedRelation无子节点

Project、Filter、UnresolvedRelation与 LogicalPlan、 TreeNode的继承关系如下:

Project、Filter、UnresolvedRelation本身是Logical Plan 、TreeNode。

后序遍历(AnalysisHelper.resolveOperatorsUp)

Parsed Logical Plan 需要 通过Analyzer类中的一系列rule 转换生成Analyzed Logical Plan。

下图是Analyzer类中rule,会提前初始化在batches里:

这里的rule通过apply方法遍历Parsed Logical Plan 的每个节点,依据定好的规则生成Analyzed Logical Plan,以 ResolveHints.ResolveJoinStrategyHint为例:

主要通过AnalysisHelper 中的 resolveOperatorsUp(后序遍历的)方法:

代码语言:javascript复制
// 入参为rule,偏函数
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
  if (!analyzed) {
    AnalysisHelper.allowInvokingTransformsInAnalyzer {
     // 1、先遍历子节点,得到叶子节点
      val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
      
    //2、为节点执行规则
    if (self fastEquals afterRuleOnChildren) {
        CurrentOrigin.withOrigin(origin) {
    // 如果遍历后当前节点没有发生变化,对当前的plan执行rule规则
          rule.applyOrElse(self, identity[LogicalPlan])
        }
      } else {
        CurrentOrigin.withOrigin(origin) {
    // 如果遍历后 当前 节点发 生了变化,则新负值的afterRuleOnChildren执行rule规则
          rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
        }
      }
    }
  } else {
    self
  }
}

当前的节点为Project,执行Project 的 resolveOperatorsUp 方法,该方法会先遍历Project的子节点。

第一层遍历:

执行Project 的 mapChildren方法

第二层遍历:

执行Filter 的 mapChildren方法

第三层遍历:

执行UnresolvedRelation 的 mapChildren方法

由 于 UnresolvedRelation为子节点,返回节点本 身,为UnresolvedRelation执行rule。

为UnresolvedRelation节点执行ResolveJoinStrategyHint的apply方法:

代码语言:javascript复制
// 该规则主要是针对Hint节点起作用 ,目前是UnresolvedRelation节点
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
  case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
    if (h.parameters.isEmpty) {
      // If there is no table alias specified, apply the hint on the entire subtree.
      ResolvedHint(h.child, createHintInfo(h.name))
    } else {
      // Otherwise, find within the subtree query plans to apply the hint.
      val relationNamesInHint = h.parameters.map {
        case tableName: String => UnresolvedAttribute.parseAttributeName(tableName)
        case tableId: UnresolvedAttribute => tableId.nameParts
        case unsupported => throw new AnalysisException("Join strategy hint parameter "  
          s"should be an identifier or string but was $unsupported (${unsupported.getClass}")
      }.toSet
      val relationsInHintWithMatch = new mutable.HashSet[Seq[String]]
      val applied = applyJoinStrategyHint(
        h.child, relationNamesInHint, relationsInHintWithMatch, h.name)

      // Filters unmatched relation identifiers in the hint
      val unmatchedIdents = relationNamesInHint -- relationsInHintWithMatch
      hintErrorHandler.hintRelationsNotFound(h.name, h.parameters, unmatchedIdents)
      applied
    }
}

这个规则主要是 对Hint节点起作用,但目前是UnresolvedRelation节点,不能匹配的上。因此通过

代码语言:javascript复制
  rule.applyOrElse(self, identity[LogicalPlan])

之后,返回UnresolvedRelation本身。

UnresolvedRelation返回后,就会接着先后为Filter-->Project执行ResolveJoinStrategyHint规则,最后返回Project本身。

到此,整个ResolveJoinStrategyHint对Logical plan的 遍历及执行规则的 过 程 就结束了。

前序遍历(AnalysisHelper.resolveOperatorsDown)

Analyzer 中的 ExtractWindowExpressions规则

主要通过AnalysisHelper 中的 resolveOperatorsDown方法:

代码语言:javascript复制
/** Similar to [[resolveOperatorsUp]], but does it top-down. */
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
  if (!analyzed) {
    AnalysisHelper.allowInvokingTransformsInAnalyzer {   
      val afterRule = CurrentOrigin.withOrigin(origin) {
       // 1、为当前节点执行规则
        rule.applyOrElse(self, identity[LogicalPlan])
      }

      //  2、对执行完规则后的新节点遍历迭代
      if (self fastEquals afterRule) {
        //如果执行完规则后的节点没有变化(即规则没有起到作用),则对节点遍历迭代
        mapChildren(_.resolveOperatorsDown(rule))
      } else {
       //如果执行完规则后的节点发生变化,则对新节点遍历迭代
        afterRule.mapChildren(_.resolveOperatorsDown(rule))
      }
    }
  } else {
    self
  }
}

先为Project节点执行ExtractWindowExpressions.apply方法:

Project节点模式匹配case p: LogicalPlan if !p.childrenResolved => p 返回Project节点本身

第一层遍历:

得到Project的子节点Filter,执行Filter.resolveOperatorsDown方法,先对Filter节点执行ExtractWindowExpressions.apply方法,跑一遍规则,最后由于sql没有用到window相关函数,返回Filter节点本身,开始对Filter节点进行遍历

第二层遍历:

得到Filter的子节点UnresolvedRelation,执行UnresolvedRelation.resolveOperatorsDown方法,先对UnresolvedRelation节点执行ExtractWindowExpressions.apply方法,跑一遍规则,返回UnresolvedRelation节点本身,开始对UnresolvedRelation节点进行遍历

第三层遍历:

UnresolvedRelation没有子节点,在mapChildren方法被返回。

最终 返回Project节点,ExtractWindowExpressions执行完成。

思考

什么rule适合用后序遍历?什么rule适合前序遍历?

当我们自己开发规则时,该怎么选呢?

0 人点赞