Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句、ETL 作业或者上传运行自定义 JAR 包,支持作业运维管理。
本文将为您详细介绍如何使用 datagen 和 blackhole 连接器随机产生数据和存储数据,来实现一个最简单的 Flink 任务。
一、前置准备
创建 Oceanus 集群
活动购买链接 1 元购买 Oceanus 集群。
进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群。
二、创建 Oceanus 作业
1. 创建 Source
代码语言:txt复制- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/dev/table/connectors/datagen.html
CREATE TABLE random_source (
user_id INT,
item_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1', -- 每秒产生的数据条数
'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)
'fields.user_id.start' = '1', -- 序列的起始值
'fields.user_id.end' = '10000', -- 序列的终止值
'fields.item_id.kind' = 'random', -- 无界的随机数
'fields.item_id.min' = '1', -- 随机数的最小值
'fields.item_id.max' = '1000', -- 随机数的最大值
'fields.behavior.length' = '5' -- 随机字符串的长度
);
2. 创建 Sink
代码语言:txt复制-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/blackhole/
CREATE TABLE blackhole_sink (
user_id INT,
item_id INT,
behavior VARCHAR
) WITH ('connector' = 'blackhole');
3. 编写业务 SQL
代码语言:txt复制INSERT INTO blackhole_sink
(
SELECT user_id,
item_id,
behavior
FROM random_source
);
4. 发布运行
点击工具栏【语法检查】进行 SQL 语法检查,检查无误后点击【保存】>【发布草稿】运行作业。
三、总结
Datagen Connector
连接器是一款用于生成随机数据的 Connector,一般作为测试使用。- Sink 到
Blackhole
的数据会被丢弃,用户无法查询到其中的数据,此连接器一般用于于性能测试。
参考阅读
[1] data gen connector 参考链接:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/datagen/
[2] blackhole connecotr 参考链接:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/blackhole/