作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将会介绍如何使用 Flink 实现常见的 TopN 统计需求。首先使用 Python 脚本模拟生成商品购买数据(每秒钟发送一条)并发送到 CKafka,随后在 Oceanus 平台创建 Flink SQL 作业实时读取 CKafka 中的商品数据,经过滚动窗口(基于事件时间)统计每分钟内商品购买种类的前三名(Top3),最后将结果存储于 PostgreSQL。
前置准备
创建流计算 Oceanus 集群
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,创建 CKafka 实例,具体可参考 CKafka 创建实例 [4]。随后点击进入实例,单击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。
数据准备
本示例使用 Python 脚本向 Topic 发送模拟数据,前提条件需要网络互通。这里我们选择的是与 CKafka 同 VPC 的 CVM 进入,并且安装 Python 环境。如若网络不通,可在 CKafka 实例里面【基本信息】>【接入方式】>【添加路由策略】>【路由类型】里面选择 VPC 网络 或 公网域名接入 的方式打通网络,具体可参考 CKafka 官网 入门流程指引 [6]。
#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']kafka_topic_oceanus = 'oceanus_advanced4_input'
producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii'))
def send_data(topic): user_id = random.randint(1,50) item_id = random.randint(1,1000) category_id = random.randint(1,20) user_behaviors = ['pv','buy','cart','fav'] current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) msg = { 'user_id':user_id, 'item_id':item_id, 'category_id':category_id, 'user_behavior':user_behaviors[random.randint(0,len(user_behaviors)-1)], 'time_stamp':current_time } producer.send(topic, msg) print(msg) producer.flush()
if __name__ == '__main__': count = 1 while True: # 每秒发送一条数据 time.sleep(1) send_data(kafka_topic_oceanus)
代码语言:javascript复制
更多接入方式请参考 CKafka 收发消息 [7]
创建 PostgreSQL 实例
进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus_advanced4_output
表,用于接收数据。
-- 建表语句create table public.oceanus_advanced4_output (win_start TIMESTAMP,category_id INT,buy_count INT,PRIMARY KEY(win_start,category_id) );
代码语言:javascript复制
笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]
流计算 Oceanus 作业
1. 创建 Source
CREATE TABLE `kafka_json_source_table` ( user_id INT, item_id INT, category_id INT, user_behavior VARCHAR, time_stamp TIMESTAMP(3), WATERMARK FOR time_stamp AS time_stamp - INTERVAL '3' SECOND) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced4_input', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。);
2. 创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` ( win_start TIMESTAMP(3), category_id INT, buy_count INT, PRIMARY KEY (win_start,category_id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- 请替换为您的实际 MySQL 连接参数 'table-name' = 'oceanus_advanced4_output', -- 需要写入的数据表 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'Tencent123$', -- 数据库访问的密码 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);
3. 编写业务 SQL
-- 创建临时视图,用于将原始数据过滤、窗口聚合CREATE VIEW `kafka_json_source_view` ASSELECT TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) AS win_start, category_id, COUNT(1) AS buy_countFROM `kafka_json_source_table`WHERE user_behavior = 'buy'GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE),category_id;
-- 统计每分钟 Top3 购买种类INSERT INTO `jdbc_upsert_sink_table`SELECTb.win_start,b.category_id,CAST(b.buy_count AS INT) AS buy_countFROM (SELECT * ,ROW_NUMBER() OVER (PARTITION BY win_start ORDER BY buy_count DESC) AS rn FROM `kafka_json_source_view` ) bWHERE b.rn <= 3;
总结
本文使用 TUMBLE WINDOW 配合 ROW_NUMBER 函数,统计分析了每分钟内购买量前三的商品种类,用户可根据实际需求选择相应的窗口函数统计对应的 TopN。更多窗口函数的使用参考 时间窗口函数 [11]。
作者在落表时将
rn
字段和win_end
字段裁剪后写入(即无排名优化写入),在使用无rn
的场景下,需对结果表主键的特别小心,如果定义有误会直接导致 TopN 的结果不准确。
参考链接
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839
[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854
[6] CKafka 入门流程指引:https://cloud.tencent.com/document/product/597/54837
[7] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834
[8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index
[9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961
[10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429
[11] 时间窗口函数:https://cloud.tencent.com/document/product/849/18077
流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓
点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~
腾讯云大数据
长按二维码 关注我们