Oceanus 简介
流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。
流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。
目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。
操作步骤
步骤1:获取Ckafka实例接入地址
Ckafka实例与Oceanus集群在同一子网时:
Ckafka接入地址为:
Ckafka实例与Oceanus集群不在同一子网时:
1、登陆 Ckafka 控制台
2、在左侧导航栏选择【实例列表】,单击实例的“ID”,进入实例基本信息页面
3、在基本信息页面的【接入方式】模块里面,点击【添加路由策略】
4、【路由类型】选择VPC网络,【网络】注意选择Oceanus对应集群的网络
步骤2: 创建topic
1、在实例基本信息页面,选择顶部【Topic管理】页签。
2、在 Topic 管理页面,单击【新建】,创建名为 oceanus_test1、oceanus_test2 的两个 Topic,接下来将讲解Oceanus如何接入Ckafka。
步骤3: 接入Ckafka
1、访问 流计算Oceanus产品,点击【立即使用】或购买产品。
2、在【作业管理】页面点击左上角【新建】,创建作业。(演示使用,这里选用SQL作业,客户可自行选择作业类型)
3、选择已经创建好的“运行集群”。
4、SQL作业开发调试。(这里实现Oceanus从Ckafka消费数据,并将数据写入Ckafka中)
(1) 创建source
代码语言:txt复制 CREATE TABLE `DataInput` (
`request_time` VARCHAR,
`client_ip` VARCHAR,
`request_method` VARCHAR
) WITH (
'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
'topic' = 'oceanus_test1', -- 替换为您要消费的 Topic
'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
'properties.bootstrap.servers' = 'IP地址:端口', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID
-- 定义数据格式 (JSON 格式)
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 结构解析异常
'json.fail-on-missing-field' = 'false' -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);
(2) 创建sink
代码语言:txt复制 CREATE TABLE `DataOutput` (
`request_time` VARCHAR,
`client_ip` VARCHAR,
`request_method` VARCHAR
) WITH (
'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
'topic' = 'oceanus_test2', -- 替换为您要消费的 Topic
'properties.bootstrap.servers' = 'IP地址:端口', -- 替换为您的 Kafka 连接地址
-- 定义数据格式 (JSON 格式)
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 结构解析异常
'json.fail-on-missing-field' = 'false' -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);
(3) 业务逻辑
代码语言:txt复制 INSERT INTO DataOutput
SELECT * FROM DataInput;
(4) 点击【作业参数】,点击【内置Connector】,选择“flink-connector-kafka”,然后点击【确认】保存。
注:具体实现请参考流计算Oceanus帮助文档
(https://cloud.tencent.com/document/product/849/48310)