【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法

2022-03-31 11:20:37 浏览数 (1)

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

【Flink】第二十六篇:源码角度分析Task执行过程

最近,笔者进行自研基于Flink SQL的源数据字段血缘。于是,开始了一段之前没有的关于SQL解析引擎的探索之路。

从认识Flink SQL的解析流程,再到认识Calcite、Antlr4,接着,从源码分析Flink是如何借助Calcite完成Flink SQL的词法语法解析、正确性验证、语义分析、查询优化、生成物理执行计划。最终,完成了一个初步的血缘效果的代码编写。

本文先阐述算法代码的思路,后续几篇文章再详细记录笔者的Flink SQL Planner的探索过程。

程序运行结果

直接上算法程序的运行结果:

测试用例:

代码语言:javascript复制
select l.id, l.name, r.id, r.ts 
from (select name, id from leftT) as l inner join (select ts, id from rightT) as r 
on l.id = r.id

这个测试用例中,最外层是一个select语句,它的from条件为一个Flink典型的双流regular join,而左流是一个子查询,右流也是一个子查询,join条件是左流的id=右流的id。

左流这个子查询是对左表的查询,表结构如下,

代码语言:javascript复制
CREATE TABLE leftT ( 
  id STRING, 
  name STRING, 
  op_ts TIMESTAMP(3), 
  ts as op_ts, 
  PRIMARY KEY(id) NOT ENFORCED, 
  watermark for op_ts as op_ts - interval '60' second 
) WITH ( 
  'connector' = 'upsert-kafka', 
  'properties.bootstrap.servers' = '...', 
  'topic' = 'main-upsert-join-20', 
  'key.format' = 'json', 
  'value.format' = 'json', 
  'properties.group.id' = 'main-04' 
)

右流这个子查询是对右表的查询,表结构如下,

代码语言:javascript复制
CREATE TABLE rightT ( 
  id STRING, 
  op_ts TIMESTAMP(3), 
  name STRING, 
  ts as op_ts, 
  PRIMARY KEY(id) NOT ENFORCED, 
  watermark for op_ts as op_ts - interval '60' second 
) WITH ( 
  'connector' = 'upsert-kafka', 
  'properties.bootstrap.servers' = '...', 
  'topic' = 'main-upsert-join-21', 
  'key.format' = 'json', 
  'value.format' = 'json', 
  'properties.group.id' = 'main-04' 
)

更直观理解,将SQL画成树状结构:

程序运行结果的解释

程序的打印结果其实是对Flink SQL解析后的语法树的一个递归遍历的过程:

depth: 1,表示根节点的遍历操作结果,如下,

代码语言:javascript复制
depth: 1   field<1↓:0↑>   field<0↓:1↑>   field<3↓:2↑>   field<2↓:3↑>

对应的SQL中的语句操作:

代码语言:javascript复制
select l.id, l.name, r.id, r.ts

depth: 2,有两行输出结果,分别表示join操作中的左右流子查询的查询结果:

代码语言:javascript复制
// (select name, id from leftT) as l                    
depth: 2   field<1↓:0↑>   field<0↓:1↑>
代码语言:javascript复制
// (select ts, id from rightT) as r                                    
depth: 2   field<3↓:2↑>   field<0↓:3↑>

将depth=1层的字段中向下箭头和depth=2层的字段中向上箭头相对连接,并且要索引数字相等:

这样,便得到了最外层select查询到内层join两边的子查询的字段的血缘关系!

depth=3,以此类推,将输出中的depth=2和depth=3层按照以上方式再次连接,就得到了两边各自的子查询和各自的Flink源表字段的连接:

depth=4,最后,将两边子查询中的depth=3和depth=4层字段对应连接,就得到了Flink SQL中的字段到最终源表的血缘.

注意,因为我们这里的create table中用了计算列字段:

代码语言:javascript复制
ts AS op_ts

所以,最终将ts字段指向了op_ts,这也符合血缘溯源的最终目的,将计算列也指向了它的本源的字段:

经过以上层层血缘关系的字段溯源,我们就可以将测试用例的SQL中的根节点查询语句的字段与最终来源表的字段对应起来了!

算法思想

1. 通过TableEnvironmentImpl得到解析器对SQL进行解析,获取operations

2. 分为PlannerQueryOperation、CatalogSinkModifyOperation进行,得到CalciteTree

3. CalciteTree本质上就是Calcite的一颗RelNode Tree,对这个RelNode Tree进行递归先序遍历

4. 在遍历处理函数中对当前RelNode包含的字段进行打印,打印操作包含两个重要的索引:字段继承自上游索引的索引值,重新编排本字段在当前遍历操作中的顺序索引值。注意:Join操作还需要对当前遍历操作中的顺序索引值进行相同层次内的继承递增。

0 人点赞