作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将您详细介绍如何利用 Python 脚本发送模拟数据到 CKafka 中,之后取 CKakfa 的数据经过简单的算术函数转换存入到 PostgreSQL 中。
前置准备
创建流计算 Oceanus 集群
进入流计算 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档 创建独享集群 [2]。
创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。
创建 Topic:
进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。
数据准备:
- Kafka 客户端: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
- 使用脚本发送:
- Java:参考 使用 SDK 收发消息 [7]
- Python:参考如下代码
#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']topic_oceanus_quickstart = 'oceanus7_test1'
producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii'))
def generate_oceanus_test_data(): results = [] for _ in range(0, 10): int_one = random.randint(1000,10000) int_two = random.randint(1,10) random_thr = random.random() msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr} results.append(msg_kv) return results
def send_data(topic, msgs): for msg in msgs: import time time.sleep(1) producer.send(topic, msg) print(msg) producer.flush()
if __name__ == '__main__': count = 1 while True: msg_oceanus_test_data = generate_oceanus_test_data() send_data(topic_oceanus_quickstart, msg_oceanus_test_data) time.sleep(30)
创建 PostgreSQL 实例
进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus7_test1
表。
-- 建表语句create table public.oceanus7_test1 ( id INT, random_thr DOUBLE PRECISION, PRIMARY KEY(id));
笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]
流计算 Oceanus 作业
1. 创建 Source
CREATE TABLE `kafka_json_source_table` ( int_one INT, int_two INT, random_thr DOUBLE) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus7_test1', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'oceanus_group2', -- 必选参数, 一定要指定 Group ID -- 定义数据格式 (JSON 格式) 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。);
2. 创建 Sink
CREATE TABLE jdbc_sink ( id INT, random_thr DOUBLE, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', -- connector 类型为'jdbc' 'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- 请替换为您的实际 PostgreSQL 连接参数 'table-name' = 'oceanus7_test1', -- 需要写入的数据表 'username' = 'root', -- 数据库用户名(需要提供 INSERT 权限) 'password' = 'Tencent123$', -- 数据库密码 -- 数据目的 Sink 性能调优参数 'sink.buffer-flush.max-rows' = '5000', -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000 'sink.buffer-flush.interval' = '2s', -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s 'sink.max-retries' = '3' -- 可选参数, 表示数据库写入出错时, 最多重试的次数);
3. 编写业务 SQL
INSERT INTO jdbc_sinkSELECT MOD(int_one,int_two) AS id, TRUNCATE(random_thr,2) AS random_thrFROM kafka_json_source_table;
总结
本例使用 Python 自动化脚本模拟数据输入到 CKafka,经过简单的算术函数转换后存入 PostgreSQL 中。更多算术函数请参考 算数函数 [11]。
参考链接
[1] 流计算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839
[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854
[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840
[7] 使用 SDK 收发消息:https://cloud.tencent.com/document/product/597/54834
[8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index
[9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961
[10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429
[11] 算术函数:https://cloud.tencent.com/document/product/849/18080
流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓
点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~
腾讯云大数据
长按二维码 关注我们