流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将您详细介绍如何利用 Python 脚本发送模拟数据到 CKafka 中,之后取 CKakfa 的数据经过简单的算术函数转换存入到 PostgreSQL 中。
前置准备
创建流计算 Oceanus 集群
活动购买链接 1 元购买 Oceanus 集群。
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。
创建 Topic:
进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。
数据准备:
- Kafka 客户端: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
- 使用脚本发送:
- Java:参考 使用 SDK 收发消息 [7]
- Python:参考如下代码
#!/usr/bin/python3
# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块
import json
import random
import time
from kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']
topic_oceanus_quickstart = 'oceanus7_test1'
producer = KafkaProducer(bootstrap_servers=broker_lists,
value_serializer=lambda m: json.dumps(m).encode('ascii'))
def generate_oceanus_test_data():
results = []
for _ in range(0, 10):
int_one = random.randint(1000,10000)
int_two = random.randint(1,10)
random_thr = random.random()
msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr}
results.append(msg_kv)
return results
def send_data(topic, msgs):
for msg in msgs:
import time
time.sleep(1)
producer.send(topic, msg)
print(msg)
producer.flush()
if __name__ == '__main__':
count = 1
while True:
msg_oceanus_test_data = generate_oceanus_test_data()
send_data(topic_oceanus_quickstart, msg_oceanus_test_data)
time.sleep(30)
创建 PostgreSQL 实例
进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus7_test1
表。
-- 建表语句
create table public.oceanus7_test1 (
id INT,
random_thr DOUBLE PRECISION,
PRIMARY KEY(id)
);
笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]
流计算 Oceanus 作业
1. 创建 Source
代码语言:sql复制CREATE TABLE `kafka_json_source_table` (
int_one INT,
int_two INT,
random_thr DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'oceanus7_test1', -- 替换为您要消费的 Topic
'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'oceanus_group2', -- 必选参数, 一定要指定 Group ID
-- 定义数据格式 (JSON 格式)
'format' = 'json',
'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。
'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
);
2. 创建 Sink
代码语言:sql复制CREATE TABLE jdbc_sink (
id INT,
random_thr DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc', -- connector 类型为'jdbc'
'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- 请替换为您的实际 PostgreSQL 连接参数
'table-name' = 'oceanus7_test1', -- 需要写入的数据表
'username' = 'root', -- 数据库用户名(需要提供 INSERT 权限)
'password' = 'Tencent123$', -- 数据库密码
-- 数据目的 Sink 性能调优参数
'sink.buffer-flush.max-rows' = '5000', -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000
'sink.buffer-flush.interval' = '2s', -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s
'sink.max-retries' = '3' -- 可选参数, 表示数据库写入出错时, 最多重试的次数
);
3. 编写业务 SQL
代码语言:sql复制INSERT INTO jdbc_sink
SELECT
MOD(int_one,int_two) AS id,
TRUNCATE(random_thr,2) AS random_thr
FROM kafka_json_source_table;
总结
本例使用 Python 自动化脚本模拟数据输入到 CKafka,经过简单的算术函数转换后存入 PostgreSQL 中。更多算术函数请参考 算数函数 [11]。
参考链接
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview [2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298 [3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1 [4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839 [5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854 [6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840 [7] 使用 SDK 收发消息:https://cloud.tencent.com/document/product/597/54834 [8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index [9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961 [10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429 [11] 算术函数:https://cloud.tencent.com/document/product/849/18080