Oceanus 实践-从0到1接入 COS SQL 作业

2021-09-30 16:37:40 浏览数 (1)

Oceanus简介

流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。

流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。

目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。本文将为您详细介绍如何使用Oceanus对接COS。

前置准备

创建Oceanus集群

进入 Oceanus控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考Oceanus官方文档 创建独享集群。

创建COS存储桶

进入 COS控制台,点击左侧【存储桶列表】,点击【创建存储桶】,具体可参考COS官方文档 创建存储桶。

?当写入COS时,Oceanus作业所运行的地域必须和COS在同一个地域

Oceanus作业

进入 Oceanus控制台,点击左侧【作业管理】,创建SQL作业,集群选择与COS在相同地域的集群。

1. 创建Source
代码语言:txt复制
CREATE TABLE `random_source` ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='10',                  -- 每秒产生的数据条数
  'fields.f_sequence.kind'='random',       -- 无界的随机数
  'fields.f_sequence.min'='1',             -- 随机数的最小值
  'fields.f_sequence.max'='10',            -- 随机数的最大值
  'fields.f_random.kind'='random',         -- 无界的随机数
  'fields.f_random.min'='1',               -- 随机数的最小值
  'fields.f_random.max'='100',             -- 随机数的最大值
  'fields.f_random_str.length'='10'        -- 随机字符串的长度
);

?此处选用内置connector datagen,请根据实际业务需求选择相应数据源,详情参考 Oceanus上下游开发指南。

2. 创建Sink
代码语言:txt复制
# 请将<存储桶名称>和<文件夹名称>替换成您实际的存储桶名和文件夹名
CREATE TABLE `cos_sink`(
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR
) PARTITIONED BY (f_sequence) WITH (
    'connector' = 'filesystem',
    'path'='cosn://<存储桶名称>/<文件夹名称>/',                 --- 数据写入的目录路径
    'format' = 'json',                                       --- 数据写入的格式
    'sink.rolling-policy.file-size' = '128MB',               --- 文件最大大小
    'sink.rolling-policy.rollover-interval' = '30 min',      --- 文件最大写入时间
    'sink.partition-commit.delay' = '1 s',                   --- 分区提交延迟
    'sink.partition-commit.policy.kind' = 'success-file'     --- 分区提交方式
);

?更多Sink的WITH参数请参考Oceanus官方文档 Filesystem (HDFS/COS)。

3. 业务逻辑
代码语言:txt复制
INSERT INTO `cos_sink`
SELECT * FROM `random_source`

!此处只做展示,无实际业务目的

4. 作业参数设置

【内置Connector】选择flink-connector-cos;

【高级参数】中对COS的地址进行如下配置:

代码语言:txt复制
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
fs.cosn.bucket.region: <COS所在地域>
fs.cosn.userinfo.appid: <COS所属用户的appid>

作业配置说明如下:

  • 请将<COS所在地域>替换为您实际的COS地域,例如:ap-guangzhou。
  • 请将<COS所属用户的appid>替换为您实际的appid,具体请进入 账号中心 查看。
  • 具体的作业参数设置请参考Oceanus官方文档 Filesystem (HDFS/COS)。
5. 启动作业

依次点击【保存】>【语法检查】>【发布草稿】启动SQL作业。点击右上角【Flink UI】可查看作业运行日志。

6. 数据验证

进入相应的COS目录,点击右侧【预览】或【下载】即可查看写入的数据。

文件预览.png文件预览.png

0 人点赞