一篇文章搞懂 Spark 3.x 的 CacheManager

2023-04-07 18:51:15 浏览数 (3)

WHAT

CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。

数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。

这个关系是自动替换的查询计划,逻辑计划返回与最初缓存的查询相同的结果。

CacheManager 只能在 Spark SQL 内部使用。

CacheManager 通过 SharedState 在 SparkSessions 之间共享。

代码语言:javascript复制
val spark: SparkSession = ...
spark.sharedState.cacheManager

CacheManager 可以是空的。

通过在 Spark 的 conf/log4j.properties 添加下面的配置可以查看 CacheManager 内部发生了什么?

代码语言:javascript复制
log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL

在触发缓存并且日志打印级别符合的情况下,会出现下面的打印日志:

代码语言:javascript复制
Asked to cache already cached data.

怎么触发 CacheManager 管理缓存?

  1. Spark 开发人员可以使用 Spark SQL 的 cache 或者 persist 算子 或者 SQL 的cache table 来通过 CacheManager 管理缓存。
  2. Spark Core 的cache 或者 persist 算子和 CacheManager 没有关系。

缓存怎么卸载?

  1. 使用 Dataset.unpersist 算子。
  2. 执行 DropTableCommand 和 TruncateTableCommand 逻辑命令。
  3. CatalogImpl 请求 uncache 和 refresh 表或视图,dropTempView/dropGlobalTempView

缓存到底长啥样?

CacheManager 使用 CachedData 数据结构使用 LogicalPlan(结构化查询)和相应的 InMemoryRelation 逻辑运算符管理缓存结构化查询。

代码语言:javascript复制
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()

可以看到缓存本质上是一个 IndexedSeq。

IndexedSeq

IndexedSeq表示保证不可变的索引序列。

索引序列支持恒定时间或接近恒定时间的元素访问和长度计算。

它们是根据用于索引和长度的抽象方法定义的。

索引序列不会给Seq添加任何新方法,但可以有效实现随机访问模式

IndexedSeq 的默认实现是一个 scala.Vector

CachedData

如果说IndexedSeq是一个容器的话,那么CachedData就是容器里面存放的数据。

我们看看CachedData的类定义。

代码语言:javascript复制
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)

可以看到CachedData底层就是一个LogicalPlan 和InMemoryRelation。

InMemoryRelation 封装了一个缓存构建器,使用它,当我们使用缓存数据的时候,就不会触发 job,并且可以实现缓存 RDD 的懒加载。

InMemoryRelation 还缓存了哪些配置?

  • spark.sql.inMemoryColumnarStorage.compressed (默认 enabled)
  • spark.sql.inMemoryColumnarStorage.batchSize (默认 10000)
  • 输入数据的存储级别 (默认 MEMORY_AND_DISK)。
  • 优化过的物理查询计划 (在请求 SessionState 执行 analyzed logical plan 之后)。
  • 输入的表名。
  • analyzed 查询计划的统计信息。

怎么判断查询是否已缓存?

代码语言:javascript复制
final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized

可以看到,Spark 通过比较两个查询计划的canonicalized 是否相等来决定是否启用缓存。

那么,canonicalized 到底是什么呢?

canonicalized

我们知道实现同一种功能,不同开发人员使用的 SQL 语法都可能存在差异,此时,为了保证能够充分利用到已有的查询计划,我们需要针对不同的查询计划做一个规范化的处理,这就是canonicalized存在的意义。

canonicalized 是在 QueryPlan.scala 中被定义的

代码语言:javascript复制
/**
 * 返回一个计划,在该计划中,已尽最大努力以一种保留
 * 结果但消除表面变化(区分大小写、交换操作顺序、表
 * 达式id等)的方式对此进行转换。
 * 计划`this.canonicalized == other.canonicalized` 总是会得到相同的结果。
 * 需要特殊规范化的计划节点应覆盖 [[doCanonicalize()]] 方法。
 * 他们应该自己消除表达式表面变化。
 */
@transient final lazy val canonicalized: PlanType = {
    var plan = doCanonicalize()
    // 如果计划没有因规范化而更改,请复制一份,这样我们就不会更改原始计划的_isCanonicalizedPlan标志。  
    if (plan eq this) {
      plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
    }
    plan._isCanonicalizedPlan = true
    plan
  }
代码语言:javascript复制
/**
 * 定义规范化如何适用于当前计划。
 */
protected def doCanonicalize(): PlanType = {
    val canonicalizedChildren = children.map(_.canonicalized)
    var id = -1
    mapExpressions {
      case a: Alias =>
        id  = 1
        // 作为表达式的根,Alias将始终采用任意的exprId,我们需要递增地从 0 开始分配 exprId,将其规范化以进行相等性测试。这个别名无关紧要,应该删除。
        val normalizedChild = QueryPlan.normalizeExpressions(a.child, allAttributes)
        Alias(normalizedChild, "")(ExprId(id), a.qualifier)

      case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
        // 顶层的“AttributeReference”也可以用于像“Alias”这样的输出,我们应该也要使 exprId 正常化。
        id  = 1
        ar.withExprId(ExprId(id)).canonicalized

      case other => QueryPlan.normalizeExpressions(other, allAttributes)
    }.withNewChildren(canonicalizedChildren)
  }
代码语言:javascript复制
/**
 * 通过使用输入属性中引用的序号更新AttributeReference中的exprId,规范化给定表达式中的exprId。
 * 它类似于BindReferences,但我们在这里不使用BindReferences,因为计划可能会将表达式作为带有type属性的参数,并用BoundReference替换它将导致错误。
 */
def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
    e.transformUp {
      case s: PlanExpression[QueryPlan[_] @unchecked] =>
        // 规范化子查询计划中的外部引用。
        val normalizedPlan = s.plan.transformAllExpressionsWithPruning(
          _.containsPattern(OUTER_REFERENCE)) {
          case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
        }
        s.withNewPlan(normalizedPlan)

      case ar: AttributeReference =>
        val ordinal = input.indexOf(ar.exprId)
        if (ordinal == -1) {
          ar
        } else {
          ar.withExprId(ExprId(ordinal))
        }
    }.canonicalized.asInstanceOf[T]
  }

通过上面的源码阅读可以得到以下的结论:

  1. 规范化重点在于消除表面变化(区分大小写、交换操作顺序、ExprId 等)
  2. 默认情况下规范化主要处理的是 ExprId。
  3. 特殊情况下规范化需要重写 QueryPlan.doCanonicalize 方法。

Spark 3.3.0 版本总共有 21 个特殊的 QueryPlan 重写了QueryPlan.doCanonicalize 方法。

代码语言:javascript复制
HiveTableScanExec
RowDataSourceScanExec
SubqueryExec
ReusedExchangeExec
FileSourceScanExec
InMemoryTableScanExec
AdaptiveSparkPlanExec
ReusedSubqueryExec
SubqueryAlias
SubqueryAdaptiveBroadcastExec
SubqueryBroadcastExec
InMemoryRelation
HiveTableRelation
View
RangeExec
QueryStageExec
BroadcastExchangeExec
Join
LogicalRelation
ResolvedHint
BatchScanExec

遍历了上面 21 种特殊查询计划的源码后,可以很明显的得出下面的结论:

  1. 规范化更多的是对当前查询计划的副本进行操作
  2. 规范化在不同的场景下只会关注某些特定属性,即这些属性一致我们也会认为这些查询计划是同一个,在 CacheManager 中将会得到重用。

0 人点赞