Flink 实践教程-入门(7):消费 Kafka 数据写入 PG

2021-11-15 10:10:26 浏览数 (1)

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将您详细介绍如何利用 Python 脚本发送模拟数据到 CKafka 中,之后取 CKakfa 的数据经过简单的算术函数转换存入到 PostgreSQL 中。

前置准备

创建流计算 Oceanus 集群

进入流计算 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。

创建 Topic:

进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备:
  • Kafka 客户端: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
  • 使用脚本发送:  
    • Java:参考 使用 SDK 收发消息 [7]
    • Python:参考如下代码
代码语言:javascript复制
#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']topic_oceanus_quickstart = 'oceanus7_test1'
producer = KafkaProducer(bootstrap_servers=broker_lists,                         value_serializer=lambda m: json.dumps(m).encode('ascii'))
def generate_oceanus_test_data():    results = []    for _ in range(0, 10):        int_one = random.randint(1000,10000)        int_two = random.randint(1,10)        random_thr = random.random()        msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr}        results.append(msg_kv)    return results
def send_data(topic, msgs):    for msg in msgs:        import time        time.sleep(1)        producer.send(topic, msg)        print(msg)    producer.flush()
if __name__ == '__main__':    count = 1    while True:        msg_oceanus_test_data = generate_oceanus_test_data()        send_data(topic_oceanus_quickstart, msg_oceanus_test_data)        time.sleep(30)

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus7_test1 表。

代码语言:javascript复制
-- 建表语句create table public.oceanus7_test1 (  id            INT,  random_thr    DOUBLE PRECISION,  PRIMARY KEY(id));

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]

流计算 Oceanus 作业

1. 创建 Source
代码语言:javascript复制
CREATE TABLE `kafka_json_source_table` (    int_one      INT,    int_two      INT,    random_thr   DOUBLE) WITH (  'connector' = 'kafka',  'topic' = 'oceanus7_test1',                -- 替换为您要消费的 Topic  'scan.startup.mode' = 'earliest-offset',   -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址  'properties.group.id' = 'oceanus_group2',  -- 必选参数, 一定要指定 Group ID  -- 定义数据格式 (JSON 格式)  'format' = 'json',  'json.fail-on-missing-field' = 'false',    -- 如果设置为 false, 则遇到缺失字段不会报错。  'json.ignore-parse-errors' = 'true'        -- 如果设置为 true,则忽略任何解析报错。);
2. 创建 Sink
代码语言:javascript复制
CREATE TABLE jdbc_sink (  id            INT,  random_thr    DOUBLE,  PRIMARY KEY (id) NOT ENFORCED) WITH (  'connector' = 'jdbc',             -- connector 类型为'jdbc'  'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true',      -- 请替换为您的实际 PostgreSQL 连接参数  'table-name' = 'oceanus7_test1',  -- 需要写入的数据表  'username' = 'root',              -- 数据库用户名(需要提供 INSERT 权限)  'password' = 'Tencent123$',       -- 数据库密码  -- 数据目的 Sink 性能调优参数  'sink.buffer-flush.max-rows' = '5000',  -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000  'sink.buffer-flush.interval' = '2s',    -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s  'sink.max-retries' = '3'                -- 可选参数, 表示数据库写入出错时, 最多重试的次数);
3. 编写业务 SQL
代码语言:javascript复制
INSERT INTO jdbc_sinkSELECT  MOD(int_one,int_two)    AS id,  TRUNCATE(random_thr,2)  AS random_thrFROM kafka_json_source_table;

总结

本例使用 Python 自动化脚本模拟数据输入到 CKafka,经过简单的算术函数转换后存入 PostgreSQL 中。更多算术函数请参考 算数函数 [11]。

参考链接

[1] 流计算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1

[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839

[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854

[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840

[7] 使用 SDK 收发消息:https://cloud.tencent.com/document/product/597/54834

[8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index

[9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961

[10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429

[11] 算术函数:https://cloud.tencent.com/document/product/849/18080

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

0 人点赞