【Flink】第二十九篇:源码分析 Blink Planner

2022-03-31 11:23:27 浏览数 (1)

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

【Flink】第二十六篇:源码角度分析Task执行过程

【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法

【Flink】第二十八篇:Flink SQL 与 Apache Calcite

接上篇 【Flink】第二十八篇:Flink SQL 与 Apache Calcite 我们介绍了Calcite的前世今生,并从Flink源码工程的角度简单分析了Calcite在Flink工程中codegen机理。

本文主要内容:

  1. 进一步介绍Calcite:框架、SQL解析流程。
  2. Blink Planner:框架、Flink SQL 解析流程、源码分析。

Calcite 框架及SQL解析流程

Calcite的本质属性是SQL解释器,所以其具备:SQL解析、SQL校验、SQL查询优化、SQL生成及数据连接查询这些功能,

但是它又省略了一些关键的功能:

  • 不存储相关元数据和基本数据
  • 不完全包含相关处理数据的算法等

由于舍弃了这些功能,Calcite可以在应用和数据存储、数据处理引擎之间很好地扮演中介的角色。如下,

即我们上一篇提过的"one size fits all"(一种查询引擎,连接多种前端和后端)。而Calcite重点抽象出来的组件就是查询优化部分,进行了用户自定义可扩展性方面的设计。

1. Query Optimizer:优化器,也称Planner, 如HepPlanner, VolcanoPlanner。它是Calcite的心脏和大脑,它接受查询计划,输出优化后的查询计划

2. 蓝框是优化器的输入、输出,及各种适配器,包括:

(1) Opeator Expressions:输入的原始计划,中间结果,最后输出的计划

Opeator Expressions是一种用于表示关系代数表达式的树状数据结构。解释器将SQL 查询解释成关系代数表达式, 之后优化器调用规则将其修改为最优表达式。优化规则会根据有关系代数的等价原理将表达式变形从而使表达式的代价降低。

但如何判断代价是否降低?办法有两种:一种是根据经验,一种是根据代价模型。

根据经验的叫启发式(Heuristic )模型, 根据代价模型估计的叫火山(Volcano)模型。相应的优化器:HepPlanner, VocanoPlanner。

代价模型的量化计算是根据从metadata provider获取关系及关系运算的元数据,再辅以量化模型的计算。元数据通常指的是一个关系(table)和关系运算(投影projection, 过滤filter, 关联join, 聚合aggregation 等)产生关系的统计数据:如关系的row count, 某个分量的 distinct count, min, max 等 。Data Processing System 会扩展Calcite 逻辑关系表达式产生“物理”关系表达式, 而这些扩展的表达式也会输入给优化器, 利用规则继续优化 。

在Calcite中,有两种主要的树型结构:

  • AST抽象语法树(SqlNode Tree)
  • 关系树(RelNode Tree)

(2) Metadata Provider:提供元数据的组件,比如对优化规则用的统计信息(RowCount of table, Distinct RowCout/min/max of a column , etc )

(3) Pluggable Rules:优化规则,利用关系代数或关系演算的等价关系,优化执行计划,使之更够最快速的执行

这三个组件是calcite 可扩展部分,因此与外部系统有连接。

3. Data Processing System:与蓝框有虚线连接,是Data Processing System对calcite 的扩展部分。 这里的Data Processing System所指的就是查询引擎。

它通过扩展metadata provider 和 pluggable rules , 向优化器提供更准确的元数据信息,更适合的代价模型, 更高效的优化规则,利用calcite 优化器产生最优化查询计划。SQL parser and validator, 是Calcite的SQL 语言的解释器, 它将用用户用SQL语言编写的查询解析称Opeator Expressions , 并验证它的合法性 。

总结:

关系代数是关系型数据库操作的理论基础,关系代数支持并、差、笛卡尔积、投影和选择等基本运算。关系代数也是 Calcite 的核心,任何一个查询都可以表示成由关系运算符组成的树。在 Calcite 中,它会先将 SQL 转换成关系表达式(relational expression),然后通过规则匹配(rules match)进行相应的优化,优化会有一个成本(cost)模型为参考。

Calcite的解析流程:

1 解析 SQL, 通过JavaCC实现,使用JavaCC按照语法描述文件 jj 文件,codegen生成的程序,把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示:

  • 词法分析 Lexer
  • 语法分析 Parser

2 语法检查,根据数据库的元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树;

  1. 有状态校验:是否符合SQL规范
  2. 无状态校验:通过与元数据结合验证SQL中的Schema、Field、Function是否存在,输入输出类型是否匹配

3 语义分析,根据 SqlNode 及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan);

4 逻辑计划优化,优化器的核心,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化:

  • 基于规则的优化
  • 基于代价的优化

5 执行逻辑,生成物理计划,并将物理执行计划转换为特定平台/引擎的可执行程序。

6 连接数据源,执行程序

Blink Planner

Planner定义了两个关键的行为:

  • SQL解析:将SQL字符串解析为对Table API调用的Operation 树;
  • 关系代数到Flink执行计划:将Operation树转换为Transformation;

其层次关系:

BatchPlanner 和 StreamPlanner 是 Blink Planner 的核心。核心的优化、转换流程定义在 PlannerBase#translate 中。BatchPlanner 和 StreamPlanner 的主要区别是使用了不同的优化逻辑。

Blink SQL执行过程:

在Blink Planner中,Tabel API和SQL语句两者在Operation层面进行了统一。

SQL的执行过程分为两个大的阶段:

  1. 从SQL语句到Operation
  2. 从Operation到Transformation

源码中的入口在:TableEnvironmentImpl

从SQL语句到Operation:

代码语言:javascript复制
List<Operation> operations = parser.parse(statement);

从Operation到Transformation,再到执行:

代码语言:javascript复制
return executeOperation(operations.get(0));

从SQL到Operation

主要步骤:

  1. 解析 SQL 字符串为 SqlNode
  2. 校验 SqlNode(AST)
  3. SqlNode 转换为 RelNode(Logical Plan)
  4. RelNode 转换为 Operation

1. 解析 SQL 字符串为 SqlNode

沿着TableEnvironmentImpl的调用,我们来分析源码,在ParserImpl#parse:

其中,主要逻辑: 1. 构造parser解析器CalciteParser

2. 构造验证器计划器FlinkPlannerImpl

CalciteParser#parse:

调用Calcite的SqlParser解析SQL,继续深入会发现本质上是调用了Calcite的SqlAbstractParserImpl#parseSqlStmtEof,而具体用的类正是上篇文章中阐述的利用JavaCC结合Flink的SQL语法配置文件parserImpls.ftl生成的FlinkSqlParserImpl,如下,

总体概览该类,有6万行,并且关系抽象和我们之前理解的一致,即完成

  • 词法解析器 Lexer:词法分析是指在计算机科学中,将字符序列转换为单词(Token)的过程。
  • 语法解析器 Parser:语法解析器通常作为 编译器 或 解释器 出现。它的作用是进行语法检查,并构建由输入单词(Token)组成的数据结构(AST)。

至此,便得到了我们这一步骤的输出:SqlNode Tree(AST)。

2. 校验 SqlNode(AST)

上一步骤中,在TableEnvironmentImpl中的第二步是构造验证器计划器FlinkPlannerImpl,沿着这个线索,继续深入验证逻辑,进入FlinkPlannerImpl#parse的SqlToOperationConverter#convert逻辑,

这个方法执行各种 DDL/DML SqlNode的主要入口,不同的 SqlNode 将在 #convert(type) 方法中实现它,其 'type' 参数是SqlNode子类。在第一步操作中就执行了验证逻辑FlinkPlannerImpl#validate

继续追入FlinkCalciteSqlValidator#validate

主要的实现是在#validateScopedExpression:

它的处理逻辑主要分为三步:

  1. rewrite expression,将其标准化,便于后面的逻辑计划优化;
  2. 注册这个 relational expression 的 scopes 和 namespaces(这两个对象代表了其元信息);
  3. 进行相应的catalog验证,这里会依赖第二步注册的 scopes 和 namespaces 信息。

后面便进入了Calcite的验证逻辑,在此不做更深入分析了。主要就是做两方面校验:

  • 有状态校验,是否符合SQL规范
  • 无状态校验,通过与元数据结合验证SQL中的Schema、Field、Function是否存在,输入输出类型是否匹配

3. SqlNode 转换为 RelNode(Logical Plan)

在上一步的SqlToOperationConverter中,分析完validate验证逻辑后,我们接着分析后面的convert逻辑,这里我们假设执行的SQL是一个DQL,那么会进入SqlToOperationConverter#convertSqlQuery,它又经过一层重构方法到了SqlToOperationConverter#toQueryOperation,

代码的关系位置已经给出注释:将经过验证后的qlNode转换为relational tree(关系树)即我们理解的RelNode(根节点为RelRoot)。

继续深入,最终到Calcite的SqlToRelConverter#convertQuery

这里其实是一个递归构建树型结构的过程,如下,

递归的大致思路就是:根据SqlNode的类型调用相应的convertXXX方法。

4. RelNode 转换为 Operation

在上一步的SqlToOperationConverter#toQueryOperation操作中,将SqlNode转换为RelRoot后,紧接着返回的是一个Operatrion的包装类型,

我们深入一探究竟,

第一步,将刚生成的RelNode Tree传给成员变量calciteTre;

第二步,构造一个TableSchema

另外我们来看看这个QueryOperation的层次关系:

综上,每一种SQL都有其对应的Operation。

至此我们完成了对Blink Planner的两大步骤的第一步的四个小步骤的全部源码分析,

1. 从SQL语句到Operation

(1) 解析 SQL 字符串为 SqlNode

(2) 校验 SqlNode(AST)

(3) SqlNode 转换为 RelNode(Logical Plan)

(4) RelNode 转换为 Operation

接着分析第二步:

2. 从Operation到Transformation

对象转换过程:

  1. Query/Modify Operation
  2. ->RelNode (Logical Plan)优化逻辑执行计划
  3. ->FlinkPhysicalRel 物理执行关系树
  4. ->ExecNode 物理执行关系树的执行信息(Physical Plan)
  5. ->Transformation 形成Transformation pipeline

转换流程:

  1. 从Operation转换为Calcite RelNode,使用Calcite提供的优化器。
  2. 使用Flink定制的Calcite优化器优化,对于流使用StreamPlanner,实际使用的是StreamCommoonSubGraphBasedOptimizer。

接着TableEnvironmentImpl#executeSql

深入此方法TableEnvironmentImpl#executeOperation,

因为我们是DQL语句,所以进入第一个if分支:executeInternal

在这里我们找到了转换的入口:translate,继续深入到PlannerBase#translate,

在这里,列出了Blink Planner第二步的所有关键逻辑:

  1. 首先是将Operation剥离外层包装,得到内层的Calcite Tree即RelNode
  2. 再对其进行优化(基于规则、基于代价)
  3. 将优化后的RelNode转换为物理执行计划(ExecNode)
  4. 将物理执行计划转换为Transformation pipeline

我们主要对优化阶段进行分析,其他分支与本次的主题——字段血缘 关系不大,所以,以后再说,

我们深入CommonSubGraphBasedOptimizer#optimize,

从原始RelNode Tree生成优化的RelNode DAG。注意:结果 DAG 中重用的节点将转换为相同的 RelNode,并且结果不包含IntermediateRelTable 。

所以,在优化之前是先将RelNode Tree转换成了RelNode DAG,那为什么要这样做呢?

因为一些子计划可以重用的,重用的子计划和原计划会使用相同的节点实例,就像一个可以重入的函数可以被调用多次,但返回的数据是相同的。而一个节点有多个parent, 就变成了不在是一棵树了。

优化类的层次结构:

这里只介绍一些优化理论,具体优化源码以后再详细介绍。

Calcite的优化流程:

对关系代数树(RelNode Tree)重复应用planner rules, cost model 指导整个查询计划优化过程,planner engine产生多个具有相同语义的关系表达式,但是cost更低。

在整个查询计划优化过程中,我们可以添加自定义的:

  1. relational operators 关系操作
  2. planner rules 计划规则
  3. cost model 代价模型
  4. statistics 统计数据

1. RBO:Pluggable Rules

优化规则, 利用关系代数或关系演算的等价关系,优化执行计划,使之更够最快速的执行,实现类:HepPlanner,根据经验:启发式(Heuristic )模型。

2. CBO:MetadataProvider

提供元数据的组件,比如对优化规则用的统计信息(RowCount of table, Distinct RowCout/min/max of a column , etc 。实现类:VolcanoPlanner,根据代价:火山(Volcano)模型。代价模型的量化计算是根据从metadata provider获取关系及关系运算的元数据,再辅以量化模型的计算。

元数据通常指的是一个关系(table)和关系运算(projection, filter, join, aggregation, etc)产生关系的统计数据:如关系的row count, 某个分量的 distinct count, min, max 等 。

常见的优化:列裁剪、谓词下推、常量折叠等。

而在Flink中,Blink自定义的优化主要有:

minibatch、local/global agg、chain、复用公共子图。

0 人点赞