flink sql 知其所以然(九):window tvf tumble window 的奇思妙解

2022-04-04 10:41:46 浏览数 (1)

1.序篇-本文结构

针对 datastream api 大家都比较熟悉了,还是那句话,在 datastream 中,你写的代码逻辑是什么样的,它最终的执行方式就是什么样的。

但是对于 flink sql 的执行过程,大家还是不熟悉的。

此节就是窗口聚合章节的第二篇,上节介绍了 1.13 之前的 tumble window 实现,本节介绍 window tvf 下的 tumble window 案例给大家介绍其使用方式和原理。

本节依然从以下几个章节给大家详细介绍 flink sql 的能力。

  1. 目标篇-本文能帮助大家了解 flink sql 什么?
    • 回顾上节的 flink sql 适用场景的结论
  2. 概念篇-先聊聊常见的窗口聚合
    • 窗口竟然拖慢数据产出?
    • 常用的窗口
  3. 实战篇-简单的 tumble window 案例和运行原理
    • 先看一个 datastream 窗口案例
    • flink sql tumble window 的语义
    • tumble window 实际案例
    • GeneratedWatermarkGenerator - flink 1.12.1
    • BinaryRowDataKeySelector - flink 1.12.1
    • AggregateWindowOperator - flink 1.12.1
  4. 总结与展望篇

先说说结论,以下这些结论已经在上节说过了,此处附上上节文章:

  1. 场景问题:flink sql 很适合简单 ETL,以及基本全部场景下的聚合类指标(本节要介绍的 tumble window 就在聚合类指标的范畴之内)。
  2. 语法问题:flink sql 语法其实是和其他 sql 语法基本一致的。基本不会产生语法问题阻碍使用 flink sql。但是本节要介绍的 tumble window 的语法就是略有不同的那部分。下面详细介绍。
  3. 运行问题:查看 flink sql 任务时的一些技巧,以及其中一些可能会碰到的坑:
    • 去 flink webui 就能看到这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑。
    • sql 的 watermark 类型必须要设置为 TIMESTAMP(3)。如果你的数据源时间戳类型是 13 位 bigint 类型时间戳,可以用 ts AS TO_TIMESTAMP_LTZ(row_time, 3) 将其转换为 TIMESTAMP(3) 类型。
    • 事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

2.目标篇-本文能帮助大家了解 flink sql tumble window 什么?

关于 flink sql tumble window 一般都会有以下问题。本文的目标也是为大家解答这些问题:

  1. 场景问题:场景问题就不必多说,datastream 在 tumble window 场景下的应用很多了,分钟级别聚合等常用场景
  2. 语法问题:1.13.2 flink sql 写 tumble window 语法已经没有之前的特殊写法了。下文详细介绍。
  3. 运行问题:使用一条简单的分钟级别同时在线案例的 tumble window sql 帮大家从 transformation、runtime 帮大家理解 tumble window 的整体运行机制。
  4. 理解误区:既然是 sql 必然要遵循 sql 语义,sql tumble window 聚合是输入多条,产出一条数据。并不像 datastream 那样可以在窗口 udf 中做到多条输入,多条输出。

在正式开始聊 tumble window 之前,先看看上节 flink sql 适用场景的结论。让大家先有 flink sql 的一个整体印象以及结论。

2.1.回顾上节的 flink sql 适用场景的结论

不装了,我坦白了,flink sql 其实很适合干的活就是 dwd 清洗,dws 聚合。

此处主要针对实时数仓的场景来说。flink sql 能干 dwd 清洗,dws 聚合,基本上实时数仓的大多数场景都能给覆盖了。

flink sql 牛逼!!!

但是!!!

经过博主使用 flink sql 经验来看,并不是所有的 dwd,dws 聚合场景都适合 flink sql(截止发文阶段来说)!!!

其实这些目前不适合 flink sql 的场景总结下来就是在处理上比 datastream 还是会有一定的损失。

先总结下使用场景:

1. dwd:简单的清洗、复杂的清洗、维度的扩充、各种 udf 的使用

2. dws:各类聚合

然后分适合的场景和不适合的场景来说,因为只这一篇不能覆盖所有的内容,所以本文此处先大致给个结论,之后会结合具体的场景详细描述。

  • 适合的场景:
    1. 简单的 dwd 清洗场景
    2. 全场景的 dws 聚合场景
  • 目前不太适合的场景:
    1. 复杂的 dwd 清洗场景:举例比如使用了很多 udf 清洗,尤其是使用很多的 json 类解析清洗
    2. 关联维度场景:举例比如 datastream 中经常会有攒一批数据批量访问外部接口的场景,flink sql 目前对于这种场景虽然有 localcache、异步访问能力,但是依然还是一条一条访问外部缓存,这样相比批量访问还是会有性能差距。

3.概念篇-先聊聊常见的窗口聚合

窗口聚合大家都在 datastream api 中很熟悉了,目前在实时数据处理的过程中,窗口计算可以说是最重要、最常用的一种计算方式了。

但是在抛出窗口概念之前,博主有几个关于窗口的小想法说一下。

3.1.窗口竟然拖慢数据产出?

一个小想法。

先抛结论:窗口会拖慢实时数据的产出,是在目前下游分析引擎能力有限的情况下的一种妥协方案。

站在数据开发以及需求方的世界中,当然希望所有的数据都是实时来的,实时处理的,实时产出的,实时展现的

举个例子:如果你要满足一个一分钟窗口聚合的 pv,uv,或者其他聚合需求。

olap 数据服务引擎 就可以满足上述的实时来的,实时处理的,实时产出的,实时展现的的场景。flink 消费处理明细数据,产出到 kafka,然后直接导入到 olap 引擎中。查询时直接用 olap 做聚合。这其中是没有任何窗口的概念的。但是整个链路中,要保障端对端精确一次,要保障大数据量情况下 olap 引擎能够秒级查询返回,更何况有一些去重类指标的计算,等等场景。把这些压力都放在 olap 引擎的压力是很大的。

因此在 flink 数据计算引擎中就诞生了窗口的概念。我们可以直接在计算引擎中进行窗口聚合计算,然后等到窗口结束之后直接把结果数据产出。这就出现了博主所说的窗口拖慢了实时数据产出的情况。而且窗口在处理不好的情况下可能会导致数据丢失。

关于上述两种情况的具体优劣选择,都由大家自行选择。上述只是引出博主一些想法。

3.2.常用的窗口

目前已知的窗口分为以下四种。

1. Tumble Windows2. Hop Windows3. Cumulate Windows4. Session Windows

这些窗口的具体描述直接见官网,有详细的说明。此处不赘述。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/

此处介绍下 flink 中常常会涉及到的两个容易混淆的概念就是:窗口 key。这里来形象的说明下。

  • 窗口:时间周期上面的划分。将无限流进行纵向切分,将无限流切分为一个一个的窗口,窗口相当于是无限流中的一段时间内的数据。
  • key:数据类别上面的划分。将无限流进行横向划分,相同 key 的数据会被划分到一组中,这个 key 的数据也是一条无限流。

如下图所示。

1

4.实战篇-简单的 tumble window 案例和运行原理

源码公众号后台回复1.13.2 tumble window 的奇妙解析之路获取。

4.1.先看一个 datastream 窗口案例

在介绍 sql tumble window 窗口算子执行案例之前,先看一个 datastream 中的窗口算子案例。其逻辑都是相通的。会对我们了解 sql tumble window 算子有帮助。

我们先看看 datastream 处理逻辑。

以下面这个为例。

代码语言:javascript复制
public class _04_TumbleWindowTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.addSource(new UserDefinedSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, Integer, Long>>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(Tuple4<String, String, Integer, Long> element) {
                        return element.f3;
                    }
                })
                .keyBy(new KeySelector<Tuple4<String, String, Integer, Long>, String>() {
                    @Override
                    public String getKey(Tuple4<String, String, Integer, Long> row) throws Exception {
                        return row.f0;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sum(2)
                .print();

        env.execute("1.12.1 DataStream TUMBLE WINDOW 案例");
    }

    private static class UserDefinedSource implements SourceFunction<Tuple4<String, String, Integer, Long>> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Tuple4<String, String, Integer, Long>> sourceContext) throws Exception {

            while (!this.isCancel) {

                sourceContext.collect(Tuple4.of("a", "b", 1, System.currentTimeMillis()));

                Thread.sleep(10L);
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }
    }
}

datastream 生产的具体的 transformation 如下图:

24

其中我们只关注最重要的 WindowOperator 算子。

25

其中 WindowOperator 算子包含的重要属性如下图。

26

来看看 WindowOperator 的执行逻辑。窗口执行的整体详细流程可以参考:http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

23

4.2.flink sql tumble window 的语义

介绍到 tumble window 的语义,总要有对比的去介绍。这里的参照物就是 datastream api。

在 datastream api 中。tumble window 一般用作以下两种场景。

  1. 业务场景:使用 tumble window 很轻松的计算出窗口内的聚合数据。一般是多条输入数据,窗口结束时一条输出数据。
  2. 优化场景:窗口聚合一批数据然后批量访问外部存储扩充维度、或者有一些自定义的处理逻辑。一般是多条输入数据,窗口结束时多条输出数据。

但是在 sql api 中。tumble window 是聚合(group by)语义,聚合在 sql 标准中的数据处理逻辑是多条输入,在窗口触发时就输出一条数据的语义。而上面的常常用在 datastream 中的优化场景是多对多的场景。因此和 sql 语义不符合。所以 flink sql tumble window 一般都是用于计算聚合运算值来使用。

4.3.tumble window 实际案例

滚动窗口的特性就是会将无限流进行纵向划分成一个一个的窗口,每个窗口都是相同的大小,并且不重叠。

22

来,在介绍原理之前,总要先用起来,我们就以下面这个例子展开。

1.(flink 1.13.2)场景:简单且常见的分维度分钟级别同时在线用户数、总销售额

数据源表:

代码语言:javascript复制
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
)

Notes - 关于 watermark 容易踩得坑:sql 的 watermark 类型必须要设置为 TIMESTAMP(3)。如果你的数据源时间戳类型是 13 位 bigint 类型时间戳,可以用 ts AS TO_TIMESTAMP_LTZ(row_time, 3) 将其转换为 TIMESTAMP(3) 类型。

数据汇表:

代码语言:javascript复制
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
)

数据处理逻辑:

可以看下下面语法,窗口聚合的写法有专门的 tumble(row_time, interval '1' minute) 写法,这就是与平常我们写的 hive sql,mysql 等不一样的地方。

代码语言:javascript复制
insert into sink_table
select dim,
    sum(bucket_pv) as pv,
    sum(bucket_sum_price) as sum_price,
    max(bucket_max_price) as max_price,
    min(bucket_min_price) as min_price,
    sum(bucket_uv) as uv,
    max(window_start) as window_start
from (
  SELECT dim,
       UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start, 
         window_end, 
         count(*) as bucket_pv,
         sum(price) as bucket_sum_price,
         max(price) as bucket_max_price,
         min(price) as bucket_min_price,
            -- 计算 uv 数
         count(distinct user_id) as bucket_uv
  FROM TABLE(TUMBLE(
     TABLE source_table
     , DESCRIPTOR(row_time)
     , INTERVAL '60' SECOND))
  GROUP BY window_start, 
       window_end,
     dim,
              -- 按照用户 id 进行分桶,防止数据倾斜
     mod(user_id, 1024)
)
group by dim,
   window_start

2.运行:可以看到,其实在 flink sql 任务中,其会把对应的处理逻辑给写到算子名称上面。

Notes - 观察 flink sql 技巧 1:这个其实就是我们观察 flink sql 任务的第一个技巧。如果你想知道你的 flink 任务在干啥,第一反应是去 flink webui 看看这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑

先看一下整个算子图,如下图。从左到右总共分为四个算子。

  1. 第一个算子就是数据源算子,分配 watermark
  2. 第二个算子就是在数据源算子的本地进行聚合,类似于 map-reduce map 阶段的 combiner 作用,先在本地进行聚合,然后将聚合结果发下去。
  3. 第三个算子就是第一层 group by 分桶聚合计算,将数据按照 user_id 分桶打散,然后聚合计算。
  4. 第四个算子就是第二层 group by 合桶计算。

3

整体描述一下:

29

来看看每一个算子具体做了什么事情。

第一个算子:

  1. table scan 读取数据源
  2. 从数据源中获取对应的字段(包括源表定义的 rowtime)
  3. 分配 watermark(按照源表定义的 watermark 分配对应的 watermark)
  4. 将一些必要的字段抽取。比如 group by 中的字段。在 hash 时需要使用。

4

第二个算子:

  1. 类似 map-reduce 的 combiner 本地聚合。这里的 combiner 的聚合粒度有两部组成,第一部分就是 group by 的 key,第二部分是 user_id。
  2. 将数据按照第一层 select 中的数据进行计算以及格式化

Notes:

  1. 首先 local agg 的目的是在不影响数据正确性的情况下,减少输出到下游的数据量,提升任务性能。
  2. 其中 max,min,count 都能很好地利用本地 combiner 输出量,比如 max 就取 group by key 粒度的最大值即可
  3. 但是一旦涉及到 count(distinct),只按照 group by key 粒度去处理数据,就会出现数据准确性问题,举例:比如两个 source 都来相同 id 的数据,在去重时,按照 group by key 去重就会导致这个 user_id 在两个算子上都计算一次,在下游算子聚合时就会将这两个结果都 1,最后结果就算重复了。

5

第三个算子:

  1. 窗口聚合分桶计算
  2. 将数据按照第一层 select 中的数据进行计算以及格式化

6

第四个算子:

  1. 窗口聚合合桶计算
  2. 将数据按照第二层 select 中的数据进行计算以及格式化
  3. 将结果 sink 到输出表

7

3.(flink 1.13.2)结果:

代码语言:javascript复制
1>  U[7, 36403, 1824202613, 99999, 2, 30498, 1632136920000]
2> -U[a, 37001, 1857079208, 99999, 3, 30857, 1632136920000]
2>  U[a, 37037, 1858977218, 99999, 3, 30886, 1632136920000]
1> -U[7, 36403, 1824202613, 99999, 2, 30498, 1632136920000]
1>  U[7, 36428, 1825407205, 99999, 2, 30523, 1632136920000]
1> -U[2, 36970, 1848722634, 99999, 6, 30876, 1632136920000]
2> -U[6, 36911, 1856162742, 99998, 2, 30801, 1632136920000]
...

4.(flink 1.13.2)原理:

关于 sql 开始运行的机制见上一节详述。

此处只介绍相比前一节新增内容。可以看到上述代码的具体 transformation 如下图。

8

4.4.LocalSlicingWindowAggOperator - flink 1.13.2

4.4.1.整体处理逻辑

9

整体处理逻辑如下图。

这里处理每一条数据时,主要是把数据放入到 local buffer 中。

1

涉及到 local combiner 处理计算时,就是第 3 点,跟进代码 windowBuffer.advanceProgress(currentWatermark)

12

13

14

这里看下具体 combine 流程。总共四步,如下图。

15

16

4.4.2.local agg udf 逻辑

其实 local agg 的处理逻辑很简单,基本和上节说的 1.12 实现一致。都是代码生成之后做 sum,count,count distinct 的计算。

27

4.5.SlicingWindowOperator - flink 1.12.1

4.5.1.整体算子处理逻辑

依然如下图:

30

先看看 transformation 中包含什么内容:

10

整体处理逻辑如下:

17

也是在处理 watermark 时,进行聚合计算。

18

19

20

21

这里有一个重点,就是 global agg udf 是执行 merge 操作进行聚合的。其逻辑就是将上游 combiner 的结果数据聚合。

22

23

在窗口触发时,将结果输出。

24

25

26

4.5.2.local agg、global agg udf 逻辑

28

其实 global agg 和 local agg 逻辑基本一致,这里不再赘述。

5.总结与展望篇

本文主要介绍了 window tvf 实现的 tumble window 聚合类指标的常见场景案例以及其底层运行原理。

而且也介绍了在查看 flink sql 任务时的一些技巧:

  1. 去 flink webui 就能看到这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑。
  2. sql 的 watermark 类型要设置为 TIMESTAMP(3)。如果你的数据源时间戳类型是 13 位 bigint 类型时间戳,可以用 ts AS TO_TIMESTAMP_LTZ(row_time, 3) 将其转换为 TIMESTAMP(3) 类型。
  3. 事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

0 人点赞