大家好,我是老羊,今天我们来学习 Flink SQL 中除了窗口操作之外最常用的数据聚合方式,Group 聚合。
Group 聚合
- ⭐ Group 聚合定义(支持 BatchStreaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中
按颜色分 key(横向)
就是 Group 聚合,按窗口划分(纵向)
就是窗口聚合。
tumble window key
- ⭐ 应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 count、sum 等聚合操作。
那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?
首先来举一个例子看看怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是按照 1 分钟的粒度进行聚合,如下 SQL:
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp()),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口写法划分窗口
tumble(row_time, interval '1' minute)
转换为 Group 聚合的写法如下:
- ⭐ Group 聚合
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp()),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / as bigint) as window_start
from source_table
group by
dim,
-- 将秒级别时间戳 / 60 转化为 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / as bigint)
确实没错,上面这个转换是一点问题都没有的。
但是窗口聚合和 Group by 聚合的差异在于:
- ⭐ 本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出
- ⭐ 运行层面:窗口聚合是和
时间
绑定的,窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。
- ⭐
SQL 语义
也是拿离线和实时做对比,Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:
- ⭐
数据源算子
(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的Group 聚合算子
,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中 - ⭐
Group 聚合算子
(group by key sumcountmaxmin):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sumcountmaxmin 结果。如果有结果oldResult
,拿出来和当前的数据进行sumcountmaxmin
计算出这个 key 的新结果newResult
,并将新结果[key, newResult]
更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult]
,然后再将新结果发往下游[key, newResult]
;如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 summaxmin 结果newResult
,并将新结果[key, newResult]
更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送[key, newResult]
。 - ⭐
数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中
这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。
特别注意:
- Group by 聚合涉及到了回撤流(也叫 retract 流),会产生回撤流是因为从整个 SQL 的语义来看,上游的 Kafka 数据是源源不断的,无穷无尽的,那么每次这个 SQL 任务产出的结果都是一个中间结果,所以每次结果发生更新时,都需要将上一次发出的中间结果给撤回,然后将最新的结果发下去。
- Group by 聚合涉及到了状态:状态大小也取决于不同 key 的数量。为了防止状态无限变大,我们可以设置状态的 TTL。以上面的 SQL 为例,上面 SQL 是按照分钟进行聚合的,理论上到了今天,通常我们就可以不用关心昨天的数据了,那么我们可以设置状态过期时间为一天。关于状态过期时间的设置参数可以参考下文
运行时参数
小节。
如果这个 SQL 放在 Hive 中执行时,其中 Orders 为 Hive,target_table 也为 Hive,其也会生成三个相同的算子,但是其和实时任务的执行方式完全不同:
- ⭐
数据源算子
(From Order):数据源算子从 Order Hive 中读取到所有的数据,然后所有数据发送给下游的Group 聚合算子
,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个算子中,然后这个算子就运行结束了,释放资源了 - ⭐
Group 聚合算子
(group by sumcountmaxmin):接收到上游算子发的所有数据,然后遍历计算 sumcountmaxmin 结果,批量发给下游数据汇算子
,这个算子也就运行结束了,释放资源了 - ⭐
数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Hive 中,整个任务也就运行结束了,整个任务的资源也就都释放了
Group 聚合支持 Grouping sets、Rollup、Cube
Group 聚合也支持 Grouping sets
、Rollup
、Cube
举一个 Grouping sets
的案例:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', ),
('supplier1', 'product2', ),
('supplier2', 'product3', ),
('supplier2', 'product4', ))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)
往期推荐
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
(下)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)
flink sql 知其所以然(十八):在 flink 中还能使用 hive udf?附源码
flink sql 知其所以然(十七):flink sql 开发利器之 Zeppelin
flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
flink sql 知其所以然(十三):流 join 很难嘛???(下)
flink sql 知其所以然(十二):流 join 很难嘛???(上)
flink sql 知其所以然(十一):去重不仅仅有 count distinct 还有强大的 deduplication
flink sql 知其所以然(十):大家都用 cumulate window 计算累计指标啦
flink sql 知其所以然(九):window tvf tumble window 的奇思妙解
flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路
flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?
flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)
flink sql 知其所以然(五)| 自定义 protobuf format
flink sql 知其所以然(四)| sql api 类型系统
flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)
flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
flink sql 知其所以然(一)| sourcesink 原理
揭秘字节跳动埋点数据实时动态处理引擎(附源码)