Over 聚合
大家好,我是老羊,今天我们来学习 Flink SQL 中的· Over 聚合操作。
- ⭐ Over 聚合定义(支持 BatchStreaming):可以理解为是一种特殊的滑动窗口聚合函数。
那这里我们拿 Over 聚合
与 窗口聚合
做一个对比,其之间的最大不同之处在于:
- ⭐ 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
- ⭐ Over 聚合:能够保留原始字段
注意: 其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?
- ⭐ 应用场景:计算最近一段滑动窗口的聚合结果数据。
- ⭐ 实际案例:查询每个产品最近一小时订单的金额总和:
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
Over 聚合的语法总结如下:
代码语言:javascript复制SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
其中:
- ⭐ ORDER BY:必须是时间戳列(事件时间、处理时间)
- ⭐ PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
- ⭐ range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为
按照行数聚合
,第二种为按照时间区间聚合
。如下案例所示:
a. ⭐ 时间区间聚合:
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。
代码语言:javascript复制CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP()),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 1 小时的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table
结果如下:
代码语言:javascript复制 I[2, 2021-12-24T22:08:26.583, 7, 73]
I[2, 2021-12-24T22:08:27.583, 7, 80]
I[2, 2021-12-24T22:08:28.583, 4, 84]
I[2, 2021-12-24T22:08:29.584, 7, 91]
I[2, 2021-12-24T22:08:30.583, 8, 99]
I[1, 2021-12-24T22:08:31.583, 9, 138]
I[2, 2021-12-24T22:08:32.584, 6, 105]
I[1, 2021-12-24T22:08:33.584, 7, 145]
b. ⭐ 行数聚合:
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。
代码语言:javascript复制CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP()),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table
预跑结果如下:
代码语言:javascript复制 I[2, 2021-12-24T22:18:19.147, 1, 9]
I[1, 2021-12-24T22:18:20.147, 2, 11]
I[1, 2021-12-24T22:18:21.147, 2, 12]
I[1, 2021-12-24T22:18:22.147, 2, 12]
I[1, 2021-12-24T22:18:23.148, 2, 12]
I[1, 2021-12-24T22:18:24.147, 1, 11]
I[1, 2021-12-24T22:18:25.146, 1, 10]
I[1, 2021-12-24T22:18:26.147, 1, 9]
I[2, 2021-12-24T22:18:27.145, 2, 11]
I[2, 2021-12-24T22:18:28.148, 1, 10]
I[2, 2021-12-24T22:18:29.145, 2, 10]
当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:
代码语言:javascript复制SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
往期推荐
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
(下)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)
flink sql 知其所以然(十八):在 flink 中还能使用 hive udf?附源码
flink sql 知其所以然(十七):flink sql 开发利器之 Zeppelin
flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
flink sql 知其所以然(十三):流 join 很难嘛???(下)
flink sql 知其所以然(十二):流 join 很难嘛???(上)
flink sql 知其所以然(十一):去重不仅仅有 count distinct 还有强大的 deduplication
flink sql 知其所以然(十):大家都用 cumulate window 计算累计指标啦
flink sql 知其所以然(九):window tvf tumble window 的奇思妙解
flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路
flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?
flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)
flink sql 知其所以然(五)| 自定义 protobuf format
flink sql 知其所以然(四)| sql api 类型系统
flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)
flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
flink sql 知其所以然(一)| sourcesink 原理
揭秘字节跳动埋点数据实时动态处理引擎(附源码)