Flink 实践教程:入门7-消费 Kafka 数据写入 PG

2021-12-08 16:00:37 浏览数 (1)

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将您详细介绍如何利用 Python 脚本发送模拟数据到 CKafka 中,之后取 CKakfa 的数据经过简单的算术函数转换存入到 PostgreSQL 中。

Flink 实践教程:入门7-消费 Kafka 数据写入 PG

前置准备

创建流计算 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群。

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。

创建 Topic:

进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备:
  • Kafka 客户端: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
  • 使用脚本发送:
    • Java:参考 使用 SDK 收发消息 [7]
    • Python:参考如下代码
代码语言:python代码运行次数:1复制
#!/usr/bin/python3
# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块
import json
import random
import time
from kafka import KafkaProducer
​
broker_lists = ['10.0.0.29:9092']
topic_oceanus_quickstart = 'oceanus7_test1'
​
producer = KafkaProducer(bootstrap_servers=broker_lists,
                         value_serializer=lambda m: json.dumps(m).encode('ascii'))
​
def generate_oceanus_test_data():
    results = []
    for _ in range(0, 10):
        int_one = random.randint(1000,10000)
        int_two = random.randint(1,10)
        random_thr = random.random()
        msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr}
        results.append(msg_kv)
    return results
​
def send_data(topic, msgs):
    for msg in msgs:
        import time
        time.sleep(1)
        producer.send(topic, msg)
        print(msg)
    producer.flush()
​
if __name__ == '__main__':
    count = 1
    while True:
        msg_oceanus_test_data = generate_oceanus_test_data()
        send_data(topic_oceanus_quickstart, msg_oceanus_test_data)
        time.sleep(30)

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus7_test1 表。

代码语言:sql复制
-- 建表语句
create table public.oceanus7_test1 (
  id            INT,
  random_thr    DOUBLE PRECISION,
  PRIMARY KEY(id)
);

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]

流计算 Oceanus 作业

1. 创建 Source
代码语言:sql复制
CREATE TABLE `kafka_json_source_table` (
    int_one      INT,
    int_two      INT,
    random_thr   DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'oceanus7_test1',                -- 替换为您要消费的 Topic
  'scan.startup.mode' = 'earliest-offset',   -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址
  'properties.group.id' = 'oceanus_group2',  -- 必选参数, 一定要指定 Group ID
  -- 定义数据格式 (JSON 格式)
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',    -- 如果设置为 false, 则遇到缺失字段不会报错。
  'json.ignore-parse-errors' = 'true'        -- 如果设置为 true,则忽略任何解析报错。
);
2. 创建 Sink
代码语言:sql复制
CREATE TABLE jdbc_sink (
  id            INT,
  random_thr    DOUBLE,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',             -- connector 类型为'jdbc'
  'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true',      -- 请替换为您的实际 PostgreSQL 连接参数
  'table-name' = 'oceanus7_test1',  -- 需要写入的数据表
  'username' = 'root',              -- 数据库用户名(需要提供 INSERT 权限)
  'password' = 'Tencent123$',       -- 数据库密码
  -- 数据目的 Sink 性能调优参数
  'sink.buffer-flush.max-rows' = '5000',  -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000
  'sink.buffer-flush.interval' = '2s',    -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s
  'sink.max-retries' = '3'                -- 可选参数, 表示数据库写入出错时, 最多重试的次数
);
3. 编写业务 SQL
代码语言:sql复制
INSERT INTO jdbc_sink
SELECT
  MOD(int_one,int_two)    AS id,
  TRUNCATE(random_thr,2)  AS random_thr
FROM kafka_json_source_table;

总结

本例使用 Python 自动化脚本模拟数据输入到 CKafka,经过简单的算术函数转换后存入 PostgreSQL 中。更多算术函数请参考 算数函数 [11]。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview [2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298 [3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1 [4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839 [5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854 [6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840 [7] 使用 SDK 收发消息:https://cloud.tencent.com/document/product/597/54834 [8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index [9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961 [10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429 [11] 算术函数:https://cloud.tencent.com/document/product/849/18080

0 人点赞