Flink 实践教程-进阶(4):TOP-N

2021-12-22 09:40:18 浏览数 (2)

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将会介绍如何使用 Flink 实现常见的 TopN 统计需求。首先使用 Python 脚本模拟生成商品购买数据(每秒钟发送一条)并发送到 CKafka,随后在 Oceanus 平台创建 Flink SQL 作业实时读取 CKafka 中的商品数据,经过滚动窗口(基于事件时间)统计每分钟内商品购买种类的前三名(Top3),最后将结果存储于 PostgreSQL。

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,创建 CKafka 实例,具体可参考 CKafka 创建实例 [4]。随后点击进入实例,单击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备

本示例使用 Python 脚本向 Topic 发送模拟数据,前提条件需要网络互通。这里我们选择的是与 CKafka 同 VPC 的 CVM 进入,并且安装 Python 环境。如若网络不通,可在 CKafka 实例里面【基本信息】>【接入方式】>【添加路由策略】>【路由类型】里面选择 VPC 网络公网域名接入 的方式打通网络,具体可参考 CKafka 官网 入门流程指引 [6]。

代码语言:javascript复制
#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']kafka_topic_oceanus = 'oceanus_advanced4_input'
producer = KafkaProducer(bootstrap_servers=broker_lists,                         value_serializer=lambda m: json.dumps(m).encode('ascii'))
def send_data(topic):    user_id = random.randint(1,50)    item_id = random.randint(1,1000)    category_id = random.randint(1,20)    user_behaviors = ['pv','buy','cart','fav']    current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())    msg = {        'user_id':user_id,        'item_id':item_id,        'category_id':category_id,        'user_behavior':user_behaviors[random.randint(0,len(user_behaviors)-1)],        'time_stamp':current_time    }    producer.send(topic, msg)    print(msg)    producer.flush()
if __name__ == '__main__':    count = 1    while True:        # 每秒发送一条数据        time.sleep(1)        send_data(kafka_topic_oceanus)
代码语言:javascript复制

更多接入方式请参考 CKafka 收发消息 [7]

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus_advanced4_output 表,用于接收数据。

代码语言:javascript复制
-- 建表语句create table public.oceanus_advanced4_output (win_start     TIMESTAMP,category_id   INT,buy_count     INT,PRIMARY KEY(win_start,category_id)  );
代码语言:javascript复制

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]

流计算 Oceanus 作业

1. 创建 Source
代码语言:javascript复制
CREATE TABLE `kafka_json_source_table` (  user_id        INT,  item_id        INT,  category_id    INT,  user_behavior  VARCHAR,  time_stamp     TIMESTAMP(3),  WATERMARK FOR time_stamp AS time_stamp - INTERVAL '3' SECOND) WITH (  'connector' = 'kafka',  'topic' = 'oceanus_advanced4_input',    -- 替换为您要消费的 Topic  'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址  'properties.group.id' = 'testGroup',     -- 必选参数, 一定要指定 Group ID  'format' = 'json',  'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。  'json.ignore-parse-errors' = 'true'      -- 如果设置为 true,则忽略任何解析报错。);
2. 创建 Sink
代码语言:javascript复制
CREATE TABLE `jdbc_upsert_sink_table` (    win_start     TIMESTAMP(3),    category_id   INT,    buy_count     INT,    PRIMARY KEY (win_start,category_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true',              -- 请替换为您的实际 MySQL 连接参数    'table-name' = 'oceanus_advanced4_output', -- 需要写入的数据表    'username' = 'root',                      -- 数据库访问的用户名(需要提供 INSERT 权限)    'password' = 'Tencent123$',               -- 数据库访问的密码    'sink.buffer-flush.max-rows' = '200',     -- 批量输出的条数    'sink.buffer-flush.interval' = '2s'       -- 批量输出的间隔);
3. 编写业务 SQL
代码语言:javascript复制
-- 创建临时视图,用于将原始数据过滤、窗口聚合CREATE VIEW `kafka_json_source_view` ASSELECT  TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) AS win_start,  category_id,  COUNT(1) AS buy_countFROM `kafka_json_source_table`WHERE user_behavior = 'buy'GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE),category_id;
代码语言:javascript复制
-- 统计每分钟 Top3 购买种类INSERT INTO `jdbc_upsert_sink_table`SELECTb.win_start,b.category_id,CAST(b.buy_count AS INT) AS buy_countFROM (SELECT *          ,ROW_NUMBER() OVER (PARTITION BY win_start ORDER BY buy_count DESC) AS rn      FROM `kafka_json_source_view`      ) bWHERE b.rn <= 3;

总结

本文使用 TUMBLE WINDOW 配合 ROW_NUMBER 函数,统计分析了每分钟内购买量前三的商品种类,用户可根据实际需求选择相应的窗口函数统计对应的 TopN。更多窗口函数的使用参考 时间窗口函数 [11]。

作者在落表时将 rn 字段和 win_end 字段裁剪后写入(即无排名优化写入),在使用无 rn 的场景下,需对结果表主键的特别小心,如果定义有误会直接导致 TopN 的结果不准确。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1

[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839

[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854

[6] CKafka 入门流程指引:https://cloud.tencent.com/document/product/597/54837

[7] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834

[8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index

[9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961

[10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429

[11] 时间窗口函数:https://cloud.tencent.com/document/product/849/18077

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

0 人点赞