WHAT
CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。
数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。
这个关系是自动替换的查询计划,逻辑计划返回与最初缓存的查询相同的结果。
CacheManager 只能在 Spark SQL 内部使用。
CacheManager 通过 SharedState 在 SparkSessions 之间共享。
代码语言:javascript复制val spark: SparkSession = ...
spark.sharedState.cacheManager
CacheManager 可以是空的。
通过在 Spark 的 conf/log4j.properties 添加下面的配置可以查看 CacheManager 内部发生了什么?
代码语言:javascript复制log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL
在触发缓存并且日志打印级别符合的情况下,会出现下面的打印日志:
代码语言:javascript复制Asked to cache already cached data.
怎么触发 CacheManager 管理缓存?
- Spark 开发人员可以使用 Spark SQL 的 cache 或者 persist 算子 或者 SQL 的cache table 来通过 CacheManager 管理缓存。
- Spark Core 的cache 或者 persist 算子和 CacheManager 没有关系。
缓存怎么卸载?
- 使用 Dataset.unpersist 算子。
- 执行 DropTableCommand 和 TruncateTableCommand 逻辑命令。
- CatalogImpl 请求 uncache 和 refresh 表或视图,dropTempView/dropGlobalTempView
缓存到底长啥样?
CacheManager 使用 CachedData 数据结构使用 LogicalPlan(结构化查询)和相应的 InMemoryRelation 逻辑运算符管理缓存结构化查询。
代码语言:javascript复制@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
可以看到缓存本质上是一个 IndexedSeq。
IndexedSeq
IndexedSeq表示保证不可变的索引序列。
索引序列支持恒定时间或接近恒定时间的元素访问和长度计算。
它们是根据用于索引和长度的抽象方法定义的。
索引序列不会给Seq添加任何新方法,但可以有效实现随机访问模式
IndexedSeq 的默认实现是一个 scala.Vector
CachedData
如果说IndexedSeq是一个容器的话,那么CachedData就是容器里面存放的数据。
我们看看CachedData的类定义。
代码语言:javascript复制case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
可以看到CachedData底层就是一个LogicalPlan 和InMemoryRelation。
InMemoryRelation 封装了一个缓存构建器,使用它,当我们使用缓存数据的时候,就不会触发 job,并且可以实现缓存 RDD 的懒加载。
InMemoryRelation 还缓存了哪些配置?
- spark.sql.inMemoryColumnarStorage.compressed (默认 enabled)
- spark.sql.inMemoryColumnarStorage.batchSize (默认 10000)
- 输入数据的存储级别 (默认 MEMORY_AND_DISK)。
- 优化过的物理查询计划 (在请求 SessionState 执行 analyzed logical plan 之后)。
- 输入的表名。
- analyzed 查询计划的统计信息。
怎么判断查询是否已缓存?
代码语言:javascript复制final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized
可以看到,Spark 通过比较两个查询计划的canonicalized 是否相等来决定是否启用缓存。
那么,canonicalized 到底是什么呢?
canonicalized
我们知道实现同一种功能,不同开发人员使用的 SQL 语法都可能存在差异,此时,为了保证能够充分利用到已有的查询计划,我们需要针对不同的查询计划做一个规范化的处理,这就是canonicalized存在的意义。
canonicalized 是在 QueryPlan.scala 中被定义的
代码语言:javascript复制/**
* 返回一个计划,在该计划中,已尽最大努力以一种保留
* 结果但消除表面变化(区分大小写、交换操作顺序、表
* 达式id等)的方式对此进行转换。
* 计划`this.canonicalized == other.canonicalized` 总是会得到相同的结果。
* 需要特殊规范化的计划节点应覆盖 [[doCanonicalize()]] 方法。
* 他们应该自己消除表达式表面变化。
*/
@transient final lazy val canonicalized: PlanType = {
var plan = doCanonicalize()
// 如果计划没有因规范化而更改,请复制一份,这样我们就不会更改原始计划的_isCanonicalizedPlan标志。
if (plan eq this) {
plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
}
plan._isCanonicalizedPlan = true
plan
}
代码语言:javascript复制/**
* 定义规范化如何适用于当前计划。
*/
protected def doCanonicalize(): PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
mapExpressions {
case a: Alias =>
id = 1
// 作为表达式的根,Alias将始终采用任意的exprId,我们需要递增地从 0 开始分配 exprId,将其规范化以进行相等性测试。这个别名无关紧要,应该删除。
val normalizedChild = QueryPlan.normalizeExpressions(a.child, allAttributes)
Alias(normalizedChild, "")(ExprId(id), a.qualifier)
case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
// 顶层的“AttributeReference”也可以用于像“Alias”这样的输出,我们应该也要使 exprId 正常化。
id = 1
ar.withExprId(ExprId(id)).canonicalized
case other => QueryPlan.normalizeExpressions(other, allAttributes)
}.withNewChildren(canonicalizedChildren)
}
代码语言:javascript复制/**
* 通过使用输入属性中引用的序号更新AttributeReference中的exprId,规范化给定表达式中的exprId。
* 它类似于BindReferences,但我们在这里不使用BindReferences,因为计划可能会将表达式作为带有type属性的参数,并用BoundReference替换它将导致错误。
*/
def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
e.transformUp {
case s: PlanExpression[QueryPlan[_] @unchecked] =>
// 规范化子查询计划中的外部引用。
val normalizedPlan = s.plan.transformAllExpressionsWithPruning(
_.containsPattern(OUTER_REFERENCE)) {
case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
}
s.withNewPlan(normalizedPlan)
case ar: AttributeReference =>
val ordinal = input.indexOf(ar.exprId)
if (ordinal == -1) {
ar
} else {
ar.withExprId(ExprId(ordinal))
}
}.canonicalized.asInstanceOf[T]
}
通过上面的源码阅读可以得到以下的结论:
- 规范化重点在于消除表面变化(区分大小写、交换操作顺序、ExprId 等)
- 默认情况下规范化主要处理的是 ExprId。
- 特殊情况下规范化需要重写 QueryPlan.doCanonicalize 方法。
Spark 3.3.0 版本总共有 21 个特殊的 QueryPlan 重写了QueryPlan.doCanonicalize 方法。
代码语言:javascript复制HiveTableScanExec
RowDataSourceScanExec
SubqueryExec
ReusedExchangeExec
FileSourceScanExec
InMemoryTableScanExec
AdaptiveSparkPlanExec
ReusedSubqueryExec
SubqueryAlias
SubqueryAdaptiveBroadcastExec
SubqueryBroadcastExec
InMemoryRelation
HiveTableRelation
View
RangeExec
QueryStageExec
BroadcastExchangeExec
Join
LogicalRelation
ResolvedHint
BatchScanExec
遍历了上面 21 种特殊查询计划的源码后,可以很明显的得出下面的结论:
- 规范化更多的是对当前查询计划的副本进行操作
- 规范化在不同的场景下只会关注某些特定属性,即这些属性一致我们也会认为这些查询计划是同一个,在 CacheManager 中将会得到重用。