【Flink】第十篇:join 之 regular join

2022-03-31 11:06:00 浏览数 (1)

往期看点:

【Flink】第五篇:checkpoint【1】

【Flink】第五篇:checkpoint【2】

【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查

【Flink】第八篇:Flink 内存管理

【Flink】第九篇:Flink SQL 性能优化实战

从本篇开启一个关于 Flink SQL 中 join 的小专题。

每篇会讨论一种Flink SQL的join方式,其实已经在之前写过两篇用upsert-kafka做temporal join的文章,但是限于当时对于Flink SQL、CDC、撤回语义等的认知水平有限,并且时间仓促,写的不尽如人意。

SQL Join

join是SQL标准语法,本意是对关系表进行关联、打宽。在传统关系数据库中经过范式化建模后,在没有熵增的情况下,一定程度上减少了数据冗余,但是它将客观世界的混沌的信息分割为了若干关系实体,而join本质则是还原关系实体间原本存在的联系,以复原客观世间中那份混沌的信息以便于阅读。

所以,在数仓中打宽之后的表恢复了范式建模之前的数据冗余,这本质上也是以空间换时间的方式以便捷的存取接口给服数据务层(DWS)使用。

Flink SQL 中的 Join

Flink SQL中的join的真正挑战在于以数据流的形式实现标准SQL中的join语义。主要包含三大类 join:

  • Regular Join(常规双流Join)
  • Interval Join(时间区间Join)
  • Temporal Join(时态表Join):和 Lookup DB 关联、和版本表关联

概念

澄清几个概念:

1. 动态表(Dynamic Table) / 时态表(Temporal Table):

表数据随时间变化

2. 版本

数据表中的每行数据都有其生命周期,例如,新插入一行数据,这行数据以此为生命周期的开始。更新这行数据,将开启这行数据下一个版本,生命周期也将重新计算,直至删除,生命周期结束。

例如,上面这张表在时间维度上先后有两个版本:v1、v2。

3. 版本表

如果时态表中的记录可以追踪和访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。

4. 普通表

如果时态表中的记录仅仅可以追踪它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

5. 连续查询

对于一条SQL查询语句来说,Flink SQL 与传统数据库查询的不同之处在于,Flink SQL 持续消费到达的行并对其结果进行更新。一个连续查询永远不会终止,并会产生一个动态表作为结果。

6. 物化视图(Materialized Views)/ 虚拟视图

物化视图被定义为一条SQL查询,就像常规虚拟视图一样。但是,不同的是,物化视图缓存查询结果,因此在访问视图时不需要对查询进行计算。缓存的一个常见难题是缓存过期。当这个查询视图的基本表被修改时,物化视图的某些值将会过期,此时便需要根据基本表的变化来对缓存的视图数据进行维护,以符合视图查询的SQL查询逻辑。

7. 流批一体

对于普通表和版本表的解读,Flink SQL采用了统一的理解和处理方式:流。这也符合Flink 的 native stream的定位。

Regular Join

本篇就先从regular join开始说起。截止到1.12版本,

regular join是最普通、最通用的join。主要有以下特点:

1. 支持INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN

2. 语法, 语义 均和传统批 SQL 一致

3. 左右流都会触发结果更新

4. 状态持续增长,一般结合 state TTL 使用

5. 只支持相等联接,即至少有一个连接条件是相等谓词的联接。

Regular Join 验证思路

验证环境:

  1. source: kafka
    1. 左流:json kafka、upsert-kafka
    2. 右流:json kafka、upsert-kafka
  2. sink: console
  3. join: join(即 inner join)、left join

验证策略:

注意:

  • json kafka的表不能定义主键,因为json kafka的表无法保证主键的语义,而upsert-kafka则相反,必须定义主键。

1. 第一组:【json kafka】 join/left join 【json kafka】分别在是否定义水位线下的验证

2. 第二组:【upsert-kafka】 join/left join 【upsert-kafka】分别在是否定义水位线下的验证

3. 第三组:【json kafka】 join/left join 【upsert-kafka】分别在是否定义水位线下的验证

4. 第四组:测试table.exec.state.ttl

我们的四个source flink table如下,

left_json:

代码语言:javascript复制
--左表, 测试水位线时加上 watermark for op_ts as op_ts
create table left_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
) with (
'connecotr' = 'kafka'
    ,'topic' = 'left-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'left-json-group'
)

right_json:

代码语言:javascript复制
--右表, 测试水位线时加上 watermark for op_ts as op_ts
create table right_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
) with (
'connecotr' = 'kafka'
    ,'topic' = 'right-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'right-json-group'
)

left_upsert:

代码语言:javascript复制
--左表, 测试水位线时加上 watermark for op_ts as op_ts
create table left_upsert(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,primary key(appl_seq) not enforced
) with (
'connecotr' = 'upsert-kafka'
    ,'topic' = 'left-upsert'
    ,'key.format' = 'json'
    ,'value.format' = 'json'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'left-upsert-group'
)

right_upsert:

代码语言:javascript复制
--右表, 测试水位线时加上 watermark for op_ts as op_ts
create table right_upsert(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,primary key(appl_seq) not enforced
) with (
'connecotr' = 'upsert-kafka'
    ,'topic' = 'right-upsert'
    ,'key.format' = 'json'
    ,'value.format' = 'json'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'right-upsert-group'
)

第一组验证结果

1. 第一组:【json kafka】 join/left join 【json kafka】分别在是否定义水位线下的验证

(1) 任何一侧表定义水位线

代码语言:javascript复制
select * 
from left_json as l inner join right_json as r 
on l.appl_seq = r.appl_seq

报错:

  • Rowtime attributes must not be in the input rows of a regular join.
  • As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

这个错误的原因是regular join定义水位线对于regular join的计算过程是没有任何实质影响的,因为regular join的join触发机制就是数据来了就马上join出去,而存在水位线时间意义的join是在temporal join中的。

那我们是否应该在regular join中的flink table中定义水位线呢?如果直接对source表regular join,那么就不用定义了,如果regular join之前有其他处理就不一定了,需要根据实际场景而定。

还有一点就是regular join后如果需要select出时间属性字段,只能在flink table中将时间属性字段转换成另外一个计算列,select这个计算列来替代时间属性字段。

修改后的表和sql如下,

代码语言:javascript复制
create table left_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,ts    as    op_ts
    ,watermark for op_ts as op_ts
) with (
'connecotr' = 'kafka'
    ,'topic' = 'left-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'left-json-group'
)
代码语言:javascript复制
create table right_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,ts    as    op_ts    
    ,watermark for op_ts as op_ts
) with (
'connecotr' = 'kafka'
    ,'topic' = 'right-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'right-json-group'
)
代码语言:javascript复制
select l.appl_seq, l.ts, l.amount, r.appl_seq, r.ts, r.amount
from left_json as l inner join right_json as r 
on l.appl_seq = r.appl_seq

(2) 两侧表均不定义水位线

这种情况下,因为没有主键,会缓存所有收到的source消息记录,即使全部字段的值完全一样,也不会当做相同的数据进行覆盖!这样一来join的结果全部是 I,即append-only。

2. 第二组:【upsert-kafka】 join/left join 【upsert-kafka】

代码语言:javascript复制
select l.appl_seq, l.ts, l.amount, r.appl_seq, r.ts, r.amount
from left_upsert as l inner join right_upsert as r 
on l.appl_seq = r.appl_seq

必须定义主键,否则报错:

‘upsert-kafka’ tables require to define a PRIMARY KEY constraint. The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.

这种情况下,因为upsert-kafka必须定义主键,会按照主键缓存收到的source消息记录的最后一条来的数据!这样一来join的结果就会有撤回逻辑了。

由于没有时间属性语义,所以,仅仅是按照接收到的消息顺序来进行判断谁是最新的数据。

另外,join的on条件可以不用主键,但是必须有至少一个相等谓词的条件。

join的结果的语义和两侧的流所形成的【普通表】按照标准SQL的join语义是一致的,只不过这是一个【持续查询】,会消费流数据并产生一个持续更新的结果。

3. 第三组:【json kafka】 join/left join 【upsert-kafka】

代码语言:javascript复制
select l.appl_seq, l.ts, l.amount, r.appl_seq, r.ts, r.amount
from left_upsert as l inner join right_json as r 
on l.appl_seq = r.appl_seq

join的撤回语义:

upsert一侧的表继承了2中的验证结果,json一侧的表继承了1中的验证结果:当upsert一侧来了相同主键的消息后会对之前的join结果进行撤回,但是json一侧的表由于每条数据都是唯一的,所以只会触发和upsert一侧的join后的新的结果。

4. 测试table.exec.state.ttl

代码语言:javascript复制
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(1000 * 5);
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamEnv.getCheckpointConfig().setCheckpointTimeout(1000 * 4);
streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 2);
streamEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.seconds(15)));

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

Configuration conf = tableEnv.getConfig().getConfiguration();

conf.setString("table.exec.state.ttl","15 s");

state会按照proctime处理ttl的状态时间。

总结


1. 支持INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN

2. 语法, 语义 均和传统批 SQL 一致

3. 左右流都会触发结果更新

4. 状态持续增长,一般结合 state TTL 使用

5. 只支持相等联接,即至少有一个连接条件是相等谓词的联接。

6. 定义水位线对于regular join的计算过程是没有任何实质影响的。

因为regular join的join触发机制就是数据来了就马上join出去。如果直接对source表regular join,那么就不用定义了,如果regular join之前有其他处理就不一定了,需要根据实际场景而定。如果定义了水位线,并且需要select出时间属性字段,只能在flink table中将时间属性字段转换成另外一个计算列,select这个计算列来替代时间属性字段。

7. kafka-json不能定义主键,upsert-kafka必须定义主键

8. kafka-json任何两条记录都是不同的记录,都会存在state里,而upsert-kafka按pk与state已有的统一pk下的state进行覆盖。

9. join的on条件可以不用pk,但是必须有至少一个相等谓词的条件。

regular join的flink逻辑设计猜想

Flink SQL regular join 的流处理底层逻辑:

两侧流顺序进入flink 的 join计算单元,在状态state中维护最新进入的主键下的row数据,两侧任意一侧来数据后都会去state里找是否存在符合join on条件的row,如果不存在发出一条op为 I 的join结果,如果已存在,先发出与旧row的-D/-U,再发出与新row的 I/ U。

  • 只维护表的各个PK下最新(不是时间最新而是顺序最新,因为没有时间属性的语义)版本的数据,注意,不定义主键的表,就算是完全相同的两条row也会被flink认为是不同的row
  • 两个流都可以触发结果的更新
  • (左、右、全)外连接后的结果的更新是-D再 I,内连接后的结果的更新是-U再 U

验证过程中的表DDL及验证数据请参考:

https://github.com/yanchenyun/wechat-docs.git

0 人点赞