1.序篇-本文结构
全网第一个 flink sql 实战,本文主要介绍 flink sql 与 calcite 之间的关系。flink sql 的解析主要依赖 calcite。
而博主通过此文抛砖引玉帮助大家理解 flink sql 在解析中是怎样依赖 calcite 的,以及 flink sql 解析的流程,sql parser 相关内容。希望对大家有所帮助。
本文通过以下几节进行介绍,对某个章节感兴趣的可以直接划到对应章节。
- 背景篇-一条 flink sql 的执行过程
- 发挥自己的想象力
- 看看 flink 的实现
- 简介篇-calcite 扮演的角色
- calcite 是啥?
- flink sql 为啥选择 calcite?
- 案例篇-calcite 的能力、案例
- 先用用 calcite
- 关系代数
- calcite 必知的基础 model
- calcite 的处理流程(以 flink sql 为例)
- calcite 怎么做到这么通用?
- 原理剖析篇-calcite 在 flink sql 中大展身手
- FlinkSqlParserImpl
- FlinkSqlParserImpl 的生成
- 总结与展望篇
2.背景篇-一条 flink sql 的执行过程
本节先给大家大致描述一条 flink sql 的执行过程
,不了解详细内容不要紧,主要先了解整个流程,有了全局视角之后,后续会详述细节。
在介绍一条 flink sql 的执行过程之前,先来看看 flink datastream 任务的执行过程,这对理解一条 flink sql 的执行过程有很大的帮助。
- datastream:datastream 在使用时要在 flink datastream api 提供的各种 udf(比如 flatmap,keyedprocessfunction 等)中自定义处理逻辑,具体的业务执行逻辑都是敲代码、 java 文件写的,然后编译在 jvm 中执行,就和一个普通的 main 函数应用一模一样的流程。因为代码执行逻辑都是自己写的,所以这一部分相对好理解。
- sql:java 编译器不能识别和编译一条 sql 进行执行,那么一条 SQL 是咋执行的呢?
2.1.先发挥自己的想象力
我们逆向思维进行考虑,如果想让一条 flink sql 按照我们的预期在 jvm 中执行,需要哪些过程。
- 整体来说:参考 datastream,如果 jvm 能执行 datastream java code 编译后的 class 文件,那么加一个 sql 解析层,能将 sql 逻辑解析为 datastream 的各种算子,然后编译执行不就 vans 了。
- sql parser:首先得有一个 sql parser 吧,得先能识别 sql 语法,将 sql 语法转化为 AST、具体的关系代数。
- 关系代数到 datastream 算子的映射:sql 逻辑解析为 datastream,需要有一个解析的映射逻辑吧。sql 是基于关系代数的,可以维护一个 sql 中的每个关系代数到具体 datastream 接口的映射关系,有了这些映射关系我们就可以将 sql 映射成一段可执行的 datastream 代码。举个例子:其可以将:
- sql select xxx 解析为类似 datastream 中的 map
- where xxx 解析为 filter
- group by 解析成 keyby
- sum(xx),count(xxx)可以解析为 datastream 中的 aggregate function
- etc...
- 代码生成:有了 sql AST,sql 到 datasretam 算子的映射关系之后,就要进行具体的代码生成了。比如去解析 sql AST 中具体哪些字段用作 where 逻辑,哪些字段用作 group by,都需要生成对应具体的 datastream 代码。
- 运行:经过上述流程之后,就可以将一个 sql 翻译成一个 datastream 作业了,happy 的执行。
如下图所示,描绘了上述逻辑:
12
那么这个和 flink 实际实现有啥异同呢?
flink 大致是这样做的,虽在 flink 本身的中间还有一些其他的流程,后来的版本也不是基于 datastream,但是整体的处理逻辑还是和上述一致的。
所以不了解整体流程的同学可以先按照上述流程进行理解。
按照 博主的脑洞
来总结一条 sql 的使命就是:sql -> AST -> codegen(java code) -> 让我们 run 起来好吗
2.2.看看 flink 的实现
26
上面手绘可能看不清,下面这张图更清楚。
28
标准的一条 flink sql 运行起来的流程如下:
Notes:刚开始对其中的 SqlNode,RelNode 概念可能比较模糊。先理解整个流程,后续会详细介绍这些概念。
- sql 解析阶段:calcite parser 解析(sql -> AST,AST 即 SqlNode Tree)
- SqlNode 验证阶段:calcite validator 校验(SqlNode -> SqlNode,语法、表达式、表信息)
- 语义分析阶段:SqlNode 转换为 RelNode,RelNode 即 Logical Plan(SqlNode -> RelNode)
- 优化阶段:calcite optimizer 优化(RelNode -> RelNode,剪枝、谓词下推等)
- 物理计划生成阶段:Logical Plan 转换为 Physical Plan(等同于 RelNode 转换成 DataSetDataStream API)
- 后续的运行逻辑与 datastream 一致
可以发现 flink 的实现
比 博主的脑洞
整体主要框架上面是一致的。多出来的部分主要是 SqlNode 验证阶段,优化阶段。
3.简介篇-calcite 在 flink sql 中的角色
大致了解了 一条 flink sql 的运行流程
之后,我们来看看 calcite 这玩意到底在 flink 里干了些啥。
根据上文总结来说 calcite 在 flink sql 中担当了 sql 解析、验证、优化
功能。
30
看着 calcite 干了这么多事,那 calcite 是个啥东东,它的定位是啥?
3.1.calcite 是啥?
calcite 是一个动态数据的管理框架,它可以用来构建数据库系统的不同的解析的模块,但是它不包含数据存储数据处理等功能。
calcite 的目标是一种方案,适应所有的需求场景,希望能为不同计算平台和数据源提供统一的 sql 解析引擎,但是它只是提供查询引擎,而没有真正的去存储这些数据。
61
下图是目前使用了 calcite 能力的其他组件,也可见官网 https://calcite.apache.org/docs/powered_by.html 。
4
简单来说的话,可以先理解为 calcite 具有这几个功能(当然还有其他很牛逼的功能,感兴趣可以自查官网)。
- 自定义 sql 解析器:比如说我们新发明了一个引擎,然后我们要在这个引擎上来创造一套基于 sql 的接口,那么我们就可以使用直接 calcite,不用自己去写一套专门的 sql 的解析器,以及执行以及优化引擎,calcite 人都有。
- sql parser(extends SqlAbstractParserImpl):将 sql 的各种关系代数解析为具体的 AST,这些 AST 都能对应到具体的 java model,在 java 的世界里面,对象很重要,有了这些对象(
SqlSelect
、SqlNode
),就可以根据这些对象做具体逻辑处理了。举个例子,如下图,一条简单的select c,d from source where a = '6'
sql,经过 calcite 的解析之后,就可以得到 AST model(SqlNode)。可以看到有SqlSelect
、SqlIdentifier
、SqlIdentifier
、SqlCharStringLiteral
。 - sql validator(extends SqlValidatorImpl):根据语法、表达式、表信息进行 SqlNode 正确性校验。
- sql optimizer:剪枝、谓词下推等优化
上面的这些能力整体组成如下图所示:
29
实际使用 calcite 解析一条 sql,跑起来看看。
2
3.2.flink sql 为什么选择 calcite?
- 不用重复造轮子。有限的精力应该放在有价值的事情上。
- calcite 有针对 stream 表的解决方案。具体可见 https://calcite.apache.org/docs/stream.html。
4.案例篇-calcite 的能力、案例
4.1.先用用 calcite
重中之重,在了解原理之前,先跑起来是王道,也会帮助我们逐步理解。
官网已经有一个 csv 的案例了。感兴趣的可以直达 https://calcite.apache.org/docs/tutorial.html 。
跑完一个 csv demo,在详细了解 calcite 之前还需要了解下 sql,calcite 的支柱:关系代数。
4.2.关系代数
sql 是基于关系代数的查询语言,是关系代数在工程上的一种很好的实现方案。在工程中,关系代数难表达,但是 sql 就易于理解。关系代数和 sql 的关系如下。
- 可以将一条 sql 解析为一个关系代数表达式的组合。在 sql 中的操作都可以转化成关系代数的表达式。
- sql 的执行优化(所有的优化的前提都是优化前和优化后最终执行结果相同,即等价交换)是基于关系代数运算的。
4.2.1.常用关系代数
总结下,有哪些常用的关系代数:
50
4.2.2.sql 优化支柱之关系代数等价变换
关系代数等价变换是 calcite optimizer 的基础理论。
下面是一些等价变换的例子。
1.连接(⋈
),笛卡尔积(×
)的交换律
51
2.连接(⋈
),笛卡尔积(×
)的结合律
3.投影(Π
)的串接定律
4.选择(σ
)的串接定律
5.选择(σ
)与投影(Π
)的交换
6.选择(σ
)与笛卡尔积(×
)的交换
7.选择(σ
)与并(∪
)的交换
8.选择(σ
)与差(-
)的交换
9.投影(Π
)与笛卡尔积(×
)的交换
10.投影(Π
)与并(∪
)的交换
然后看一个基于关系代数优化的实际 sql 案例:
有三个关系 A(a1,a2,a3,…)
、B(b1,b2,b3, … )
、C(a1,b1,c1,c2, … )
有一个查询请求如下:
SELECT A.a1 FROM A,B,C WHERE A.a1 = C.a1 AND B.b1 = C.b1 AND f(c1)
1.首先将 sql 转为关系代数的语法树。
36
2.优化:选择(σ
)的串接定律。
47
37
3.优化:选择(σ
)与笛卡尔积(×
)的交换。
48
38
4.优化:投影(π
)与笛卡尔积(×
)的交换。
49
关于关系代数我们就有了大致的了解。
除此之外,对于更深入了解 flink sql,calcite 而言,我们还需要了解一下在 calcite 代码体系中有哪些重要 model。
4.3.calcite 必知的基础 model
calcite 中有两个最最基础、重要的 model 在我们理解 flink sql 解析流程时需要知道的。
- SqlNode:sql 转化而成,可以理解为直观表达 sql 层次结构的的 model
- RelNode:SqlNode 转化而成,可以理解为将 SqlNode 转化为关系代数,表达关系代数层次结构的 model
举个例子来说明下,下面这条 flink sql,经过解析之后的 SqlNode,RelNode 如下图:
代码语言:javascript复制SELECT
sum(part_pv) as pv,
window_start
FROM (
SELECT
count(1) as part_pv,
cast(tumble_start(rowtime, INTERVAL '60' SECOND) as bigint) * 1000 as window_start
FROM
source_db.source_table
GROUP BY
tumble(rowtime, INTERVAL '60' SECOND)
, mod(id, 1024)
)
GROUP BY
window_start
62
可以看到 SqlNode 包含的内容是 sql 的层次结构,包括 selectList
,from
,where
,group by
等。
RelNode 包含的是关系代数的层次结构,每一层都有一个 input 来承接。结合上面优化案例的树状结构一样。
63
4.4.calcite 的处理流程(以 flink sql 为例)
29
如上图所示,此处我们结合上节介绍的 calcite 的 model,以及 flink sql 的实现来走一遍其处理流程:
- sql 解析阶段(sql –> SqlNode)
- SqlNode 验证(SqlNode –> SqlNode)
- 语义分析(SqlNode –> RelNode)
- 优化阶段(RelNode –> RelNode)
4.4.1.flink sql demo
代码语言:javascript复制SELECT
sum(part_pv) as pv,
window_start
FROM (
SELECT
count(1) as part_pv,
cast(tumble_start(rowtime, INTERVAL '60' SECOND) as bigint) * 1000 as window_start
FROM
source_db.source_table
GROUP BY
tumble(rowtime, INTERVAL '60' SECOND)
, mod(id, 1024)
)
GROUP BY
window_start
其中前三步解析和转化,都在 在执行 TableEnvironment#sqlQuery
进行。
最后一步优化,在执行 sink 操作时进行,即在这个例子中是 tEnv.toRetractStream(result, Row.class)
。
源码公众号后台回复flink sql 知其所以然(六)| flink sql 约会 calcite获取。
4.4.2.sql 解析阶段(sql –> SqlNode)
sql 解析阶段使用 Sql Parser 将 sql 解析为 SqlNode
。这一步在执行 TableEnvironment#sqlQuery
进行。
可以从上图看到 flink sql 具体实现类是 FlinkSqlParserImpl
。
68
具体 parse 得到 SqlNode 如上图。
4.4.3.SqlNode 验证(SqlNode –> SqlNode)
上面的第一步生产的 SqlNode 对象是一个未经验证的,这一步就是语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。进行语法检查的实现如下:
可以从上图看到 flink sql 校验器的具体实现类是 FlinkCalciteSqlValidator
,其中包含了元数据信息,从而可以进行元数据信息检查。
4.4.4.语义分析(SqlNode –> RelNode)
这一步就是将 SqlNode 转换成 RelNode,也就是生成相应的关系代数层面的逻辑(这里一般都叫做逻辑计划:Logical Plan)。
4.4.5.优化阶段(RelNode –> RelNode)
这一步就是优化阶段。详细内容可以自己 debug 代码查看,此处不赘述。
4.5.calcite 怎么做到这么通用?
此处以 calcite parser 举例说明,其模块为什么这通用?其他的模块都是类似的方式。
先说结论
:因为 calcite parser 模块提供了接口,具体的 parse 逻辑、规则是可以根据用户自定义进行配置的。大家可以看下图,博主画出了一张图进行详述。
5
如上图,引擎 sql 解析器的生成是有一个输入的,就是 用户自定义语法分析规则变量
,具体引擎的 sql 解析器其实也是根据用户自定义的 解析规则
去生成的 解析器
。其 解析器
的动态生成依赖 javacc
这样的组件。calcite 提供的是统一的 sql AST 模型、优化模型接口等,而具体的解析实现交给了用户自己去决定。
javacc
会根据 calcite 中定义的 Parser.jj
文件,生成具体的 sql parser 代码(如上图),这个 sql parser 的能力就是将 sql 转换成 AST (SqlNode)。关于 calcite 能力的更详细内容见 https://matt33.com/2019/03/07/apache-calcite-process-flow/ 。
上图涉及到的文件大家可以下载 calcite 源码
https://github.com/apache/calcite.git 之后,切换到 core
module 之后查看。
31
4.5.1.javacc 是啥?
javacc
是一个用 java 开发的最受欢迎的语法分析生成器。这个分析生成器工具可以读取上下文无关且有着特殊意义的语法并把它转换成可以识别且匹配该语法的 java 程序。它是 100% 的纯 java 代码,可以在多种平台上运行。
简单解释 javacc
就是它是一个通用的语法分析生产器,用户可以使用 javacc
任意定义一套 DSL 及解析器。
举个例子,如果哪天你觉得 sql 也不够简洁通用,你可以使用 javacc
自己定义一套更简洁的 user-define-ql
。然后使用 javacc 作为你的 user-define-ql
的解析器。是不是很流批,可以自己去搞编译器了。
4.5.2.跑跑 javacc
这里不介绍具体的 javacc 语法,直接以官网的 Simple1.jj
为案例。详细语法和功能可以参考官网(https://javacc.github.io/javacc/) 或者一下博客。
- https://www.cnblogs.com/Gavin_Liu/archive/2009/03/07/1405029.html
- https://www.yangguo.info/2014/12/13/编译原理-Javacc使用/
- https://www.engr.mun.ca/~theo/JavaCC-Tutorial/javacc-tutorial.pdf
Simple1.jj
是用于识别一系列的 {相同数量的花括号}
,之后跟着 0 个或多个行终结符。
7
下面是合法的字符串例子:
{}
,{{{{{}}}}}
,etc.
下面是不合法的字符串例子:
{{{{
,{}{}
,{}}
,{{}{}}
,etc.
接下来让我们实际将 Simple1.jj
编译生成具体的规则代码。
在 pom 中加入 javacc build 插件:
代码语言:javascript复制<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Simple1.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
在 compile 之后,就会在 generated-sources
下生成代码:
8
然后把代码 copy 到 Sources 路径下:
33
执行下代码,可以看到 {}
,{{}}
都可以校验通过,一旦出现不符合规则的 {{
输入,就会抛出异常。
关于 javacc 基本上就了解个大概了。
感兴趣的可以尝试自定义一个编译器。
4.5.3.fmpp 是啥?
5
fmpp 就是一个基于 freemarker 的模板生产器。用户可以统一管理自己的变量,然后用 ftl 模板 变量
生成对应的最终文件。在 calcite 中使用 fmpp 作为变量 模板的统一管理器。然后基于 fmpp 来生成对应的 Parser.jj
文件。
5.原理剖析篇-calcite 在 flink sql 中大展身手
博主画了一张图,包含了其中重要组件之间的依赖关系。
3
你没猜错,还是上面那些流程,fmpp(Parser.jj 模板生成)
-> javacc(Parser 生成)
-> calcite
。
在介绍 Parser 生成流程之前,先看看 flink 最终生成的 Parser:FlinkSqlParserImpl
(此处使用 Blink Planner)。
5.1.FlinkSqlParserImpl
以下面这个案例出发(代码基于 flink 1.13.1 版本):
代码语言:javascript复制public class ParserTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple3<String, Long, Long>> tuple3DataStream =
env.fromCollection(Arrays.asList(
Tuple3.of("2", 1L, 1627254000000L),
Tuple3.of("2", 1L, 1627218000000L 5000L),
Tuple3.of("2", 101L, 1627218000000L 6000L),
Tuple3.of("2", 201L, 1627218000000L 7000L),
Tuple3.of("2", 301L, 1627218000000L 7000L),
Tuple3.of("2", 301L, 1627218000000L 7000L),
Tuple3.of("2", 301L, 1627218000000L 7000L),
Tuple3.of("2", 301L, 1627218000000L 7000L),
Tuple3.of("2", 301L, 1627218000000L 7000L),
Tuple3.of("2", 301L, 1627218000000L 86400000 7000L)))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Long> element) {
return element.f2;
}
});
tEnv.registerFunction("mod", new Mod_UDF());
tEnv.registerFunction("status_mapper", new StatusMapper_UDF());
tEnv.createTemporaryView("source_db.source_table", tuple3DataStream,
"status, id, timestamp, rowtime.rowtime");
String sql = "SELECTn"
" count(1),n"
" cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)n"
"FROMn"
" source_db.source_tablen"
"GROUP BYn"
" tumble(rowtime, INTERVAL '1' DAY)";
Table result = tEnv.sqlQuery(sql);
tEnv.toAppendStream(result, Row.class).print();
env.execute();
}
}
debug 过程如之前分析 sql -> SqlNode 过程所示,如下图直接定位到 SqlParser:
21
如上图可以看到具体的 Parser 就是 FlinkSqlParserImpl
。
定位到具体的代码如下图所示(flink-table-palnner-blink-2.11-1.13.1.jar
)。
34
最终 parse 的结果 SqlNode 如下图。
22
再来看看 FlinkSqlParserImpl
是怎么使用 calcite 生成的。
具体到 flink 中的实现,位于源码中的 flink-table
.flink-sql-parser
模块(源码基于 flink 1.13.1)。
flink 是依赖 maven 插件实现的上面的整体流程。
5.2.FlinkSqlParserImpl 的生成
14
接下来看看整个 Parser 生成流程。
5.2.1.flink 引入 calcite
使用 maven-dependency-plugin
将 calcite 解压到 flink 项目 build 目录下。
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
15
5.2.2.fmpp 生成 Parser.jj
使用 maven-resources-plugin
将 Parser.jj
代码生成。
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
16
5.2.3.javacc 生成 parser
使用 javacc 将根据 Parser.jj
文件生成 Parser。
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
17
5.2.4.看看 Parser
最终生成的 Parser
就是 FlinkSqlParserImpl
。
18
5.2.5.blink planner 引入 flink-sql-parser
blink planner(flink-table-planner-blink
) 在打包时将 flink-sql-parser
、flink-sql-parser-hive
打包进去。
35
6.总结与展望篇
本文主要介绍了 flink sql 与 calcite 之间的依赖关系,以及 flink sql parser 的生成过程。
7.参考文献
https://www.slideshare.net/JordanHalterman/introduction-to-apache-calcite
https://arxiv.org/pdf/1802.10233.pdf
https://changbo.tech/blog/7dec2e4.html
http://www.liaojiayi.com/calcite/
https://www.zhihu.com/column/c_1110245426124554240
https://blog.csdn.net/QuinnNorris/article/details/70739094
https://www.pianshen.com/article/72171186489/
https://matt33.com/2019/03/07/apache-calcite-process-flow/
https://www.jianshu.com/p/edf503a2a1e7
https://blog.csdn.net/u013007900/article/details/78978271
https://blog.csdn.net/u013007900/article/details/78993101
http://www.ptbird.cn/optimization-of-relational-algebraic-expression.html
https://book.51cto.com/art/201306/400084.htm
https://book.51cto.com/art/201306/400085.htm
https://miaowenting.site/2019/11/10/Flink-SQL-with-Calcite/