Oceanus简介
流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。
流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。
目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。本文将为您详细介绍如何使用Oceanus对接COS。
前置准备
创建Oceanus集群
进入 Oceanus控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考Oceanus官方文档 创建独享集群。
创建COS存储桶
进入 COS控制台,点击左侧【存储桶列表】,点击【创建存储桶】,具体可参考COS官方文档 创建存储桶。
?当写入COS时,Oceanus作业所运行的地域必须和COS在同一个地域
Oceanus作业
进入 Oceanus控制台,点击左侧【作业管理】,创建SQL作业,集群选择与COS在相同地域的集群。
1. 创建Source
代码语言:txt复制CREATE TABLE `random_source` (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='10', -- 每秒产生的数据条数
'fields.f_sequence.kind'='random', -- 无界的随机数
'fields.f_sequence.min'='1', -- 随机数的最小值
'fields.f_sequence.max'='10', -- 随机数的最大值
'fields.f_random.kind'='random', -- 无界的随机数
'fields.f_random.min'='1', -- 随机数的最小值
'fields.f_random.max'='100', -- 随机数的最大值
'fields.f_random_str.length'='10' -- 随机字符串的长度
);
?此处选用内置connector
datagen
,请根据实际业务需求选择相应数据源,详情参考 Oceanus上下游开发指南。
2. 创建Sink
代码语言:txt复制# 请将<存储桶名称>和<文件夹名称>替换成您实际的存储桶名和文件夹名
CREATE TABLE `cos_sink`(
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) PARTITIONED BY (f_sequence) WITH (
'connector' = 'filesystem',
'path'='cosn://<存储桶名称>/<文件夹名称>/', --- 数据写入的目录路径
'format' = 'json', --- 数据写入的格式
'sink.rolling-policy.file-size' = '128MB', --- 文件最大大小
'sink.rolling-policy.rollover-interval' = '30 min', --- 文件最大写入时间
'sink.partition-commit.delay' = '1 s', --- 分区提交延迟
'sink.partition-commit.policy.kind' = 'success-file' --- 分区提交方式
);
?更多Sink的WITH参数请参考Oceanus官方文档 Filesystem (HDFS/COS)。
3. 业务逻辑
代码语言:txt复制INSERT INTO `cos_sink`
SELECT * FROM `random_source`
!此处只做展示,无实际业务目的
4. 作业参数设置
【内置Connector】选择flink-connector-cos
;
【高级参数】中对COS的地址进行如下配置:
代码语言:txt复制fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
fs.cosn.bucket.region: <COS所在地域>
fs.cosn.userinfo.appid: <COS所属用户的appid>
作业配置说明如下:
- 请将<COS所在地域>替换为您实际的COS地域,例如:ap-guangzhou。
- 请将<COS所属用户的appid>替换为您实际的appid,具体请进入 账号中心 查看。
- 具体的作业参数设置请参考Oceanus官方文档 Filesystem (HDFS/COS)。
5. 启动作业
依次点击【保存】>【语法检查】>【发布草稿】启动SQL作业。点击右上角【Flink UI】可查看作业运行日志。
6. 数据验证
进入相应的COS目录,点击右侧【预览】或【下载】即可查看写入的数据。