Oceanus 实践-从0到1接入 CKafka SQL 作业

2021-09-30 16:38:53 浏览数 (1)

Oceanus 简介

流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。

流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。

目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。

操作步骤

步骤1:获取Ckafka实例接入地址

Ckafka实例与Oceanus集群在同一子网时:

Ckafka接入地址为:

Ckafka内网IP与端口.pngCkafka内网IP与端口.png

Ckafka实例与Oceanus集群不在同一子网时:

1、登陆 Ckafka 控制台

2、在左侧导航栏选择【实例列表】,单击实例的“ID”,进入实例基本信息页面

3、在基本信息页面的【接入方式】模块里面,点击【添加路由策略】

Ckafka接入方式.pngCkafka接入方式.png

4、【路由类型】选择VPC网络,【网络】注意选择Oceanus对应集群的网络

添加路由策略.png添加路由策略.png

步骤2: 创建topic

1、在实例基本信息页面,选择顶部【Topic管理】页签。

2、在 Topic 管理页面,单击【新建】,创建名为 oceanus_test1、oceanus_test2 的两个 Topic,接下来将讲解Oceanus如何接入Ckafka。

创建topic.png创建topic.png

步骤3: 接入Ckafka

1、访问 流计算Oceanus产品,点击【立即使用】或购买产品。

2、在【作业管理】页面点击左上角【新建】,创建作业。(演示使用,这里选用SQL作业,客户可自行选择作业类型)

3、选择已经创建好的“运行集群”。

新建作业.png新建作业.png

4、SQL作业开发调试。(这里实现Oceanus从Ckafka消费数据,并将数据写入Ckafka中)

(1) 创建source

代码语言:txt复制
 CREATE TABLE `DataInput` (
       `request_time` VARCHAR,
       `client_ip` VARCHAR,
       `request_method` VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'oceanus_test1',  -- 替换为您要消费的 Topic
     'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
     'properties.bootstrap.servers' = 'IP地址:端口',  -- 替换为您的 Kafka 连接地址
     'properties.group.id' = 'testGroup',  -- 必选参数, 一定要指定 Group ID
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

(2) 创建sink

代码语言:txt复制
 CREATE TABLE `DataOutput` (
       `request_time` VARCHAR,
       `client_ip` VARCHAR,
       `request_method` VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'oceanus_test2',  -- 替换为您要消费的 Topic
     'properties.bootstrap.servers' = 'IP地址:端口',  -- 替换为您的 Kafka 连接地址
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

(3) 业务逻辑

代码语言:txt复制
 INSERT INTO DataOutput
 SELECT * FROM DataInput;

(4) 点击【作业参数】,点击【内置Connector】,选择“flink-connector-kafka”,然后点击【确认】保存。

内置Connector.png内置Connector.png

注:具体实现请参考流计算Oceanus帮助文档

(https://cloud.tencent.com/document/product/849/48310)

0 人点赞