实时即未来,最近在腾讯云流计算 Oceanus 进行实时计算服务,分享给大家~
1. 环境搭建
1.1. 创建 Oceanus 集群
在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
若之前未使用过VPC,日志(CLS),存储(COS)这些组件,需要先进行创建。
VPC及子网需要和下面的ClickHouse集群使用同一个,否则需要手动打通(如对等连接)。
创建完后的集群如下:
1.2 创建CDW ClickHouse集群
在云数据仓库控制台创建ClickHouse集群,这里为了简单,选择了与Oceanus同一个地域,同可用区。网络选择也选择与上面同样的VPC。
创建完后的集群如下:
登录集群,创建clickhouse表:通过同网段(同VPC,无需同子网)的云服务器(CVM)安装clickhouse-client进行登录。
clickhouse-client -m -h 172.28.28.85 --port 9000
创建表语句如下:
代码语言:txt复制 create database testdb;
use testdb;
CREATE TABLE IF NOT EXISTS testdb.clickhouse_sink ON CLUSTER default_cluster
(
id UInt64,
name String,
sign Int8
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY id;
至此,环境准备完毕。
2. 作业创建
2.1 创建SQL作业
在Oceanus控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector,如clickhouse connector。
2.2 创建Source端
此处选择Datagen来随机生成一些数据。
代码语言:txt复制-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 [https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html](https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html)
CREATE TABLE random_source (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='1', -- 每秒产生的数据条数
'fields.f_sequence.kind'='sequence', -- 有界序列(结束后自动停止输出)
'fields.f_sequence.start'='1', -- 序列的起始值
'fields.f_sequence.end'='10000', -- 序列的终止值
'fields.f_random.kind'='random', -- 无界的随机数
'fields.f_random.min'='1', -- 随机数的最小值
'fields.f_random.max'='1000', -- 随机数的最大值
'fields.f_random_str.length'='10' -- 随机字符串的长度
);
2.3 创建Sink端
此处sink的表信息需要和CDW ClickHouse中表信息完全对应。
代码语言:txt复制 -- ClickHouse Sink (不完全支持upsert,详见说明文档)
CREATE TABLE clickhouse_sink (
`id` INTEGER,
`name` STRING,
`sign` INTEGER--,
--PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
-- 指定数据库连接参数
'connector' = 'clickhouse',
'url' = 'clickhouse://172.28.28.165:8123',
-- 如果ClickHouse集群未配置账号密码可以不指定
--'username' = 'root',
--'password' = 'root',
'database-name' = 'testdb',
'table-name' = 'clickhouse_sink',
'table.collapsing.field' = 'sign' -- CollapsingMergeTree 类型列字段的名称
);
2.4 算子操作
此处只做了简单的数据插入,没有进行复杂计算。
代码语言:txt复制 insert into clickhouse_sink select f_sequence as id, f_random_str as name, 1 as sign from random_source;
3. 验证总结
查询clickhouse数据,数据是否插入成功。
代码语言:txt复制select * from testddb.clickhouse_sink;