Oceanus实践-消费 CMQ 主题模型数据源

2021-08-06 20:49:17 浏览数 (1)

实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为flink消费腾讯云CMQ数据实践。原文自Raigor,已获得授权,分享给大家~

Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。

1. 环境搭建

1.1 创建 Oceanus 集群

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。

若之前未使用过VPC,日志,存储这些组件,需要先进行创建。

创建完后的集群如下:

Oceanus 集群Oceanus 集群

1.2 新建 CMQ 主题

在 CMQ 控制台的【主题订阅】-> 【新建】主题,输入主题名,其他保持默认值即可。新建的主题如下:

CMQ 主题

1.3 新建 CMQ 队列

在 CMQ 控制台的【队列】-> 【新建】主题,输入队列名称、消息生命周期、堆积消息数量上限,其他保持默认值即可。我们这里新建两个队列,其中一个用来订阅 CMQ 主题模型数据,另一个用作 Oceanus 作业的目的表。新建的主题如下:

CMQ 队列CMQ 队列

1.4 新建 CMQ 主题订阅

在 CMQ 主题列表页,点击主题操作列的【订阅】链接,进入【订阅者】列表,新建订阅,输入订阅名,终端类型选择 Queue 队列服务,订阅地址选择cs2的队列,其他保持默认值。新建的订阅者如下:

CMQ 主题订阅CMQ 主题订阅

2. 作业创建

2.1 创建 SQL 作业

在 Oceanus 控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector cmq-1.1.1。

SQL 作业SQL 作业

2.2 创建数据源表和目的表

在作业的【开发调试】->【插入模板】选择 CMQ 读取 & 写入的模板,并添加。修改参数queue、secret-id、secret-key。

注意:强烈建议使用具有最小权限的secret-id和secret-key,并注意保密,防止泄漏带来的安全风险。

CMQ 读取 & 写入

代码语言:javascript复制
CREATE TABLE `CMQSourceTable` (
    `id` bigint,
    `request_method` varchar(80),
    `response` varchar(80),
    PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
	'connector' = 'cmq', 											--必须为 'cmq'
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com',	--cmq所在地域的nameServer
    'queue' = 'cs2',											--cmq的队列名
    'secret-id' = 'Your SecretId',			                                --账号secretId
    'secret-key' = 'Your SecretKey',                                          --账号secretKey
    'sign-method' = 'HmacSHA1',                                     --签名的方式
    'format' = 'csv',                                               --定义数据格式(JSON 格式)
    'batch-size' = '16',                                            --批量消费消息的个数/批量发送消息的个数
    'request-timeout' = '5000ms',                                   --请求的超时时间
    'polling-wait-timeout'= '10s',                                  --source参数; 获取不到数据情况下的等待时间
    'key-alive-timeout'= '5min',                                    --source参数;含primary key的消息,CMQ去重的有效时间
    'retry-times' = '3',                                            --sink参数;发送消息的重试次数
    'max-block-timeout' = '0s'                                      --sink参数;批量发送数据的最大等待时间
);

CREATE TABLE `CMQSinkTable` (
    `id` bigint,
    `request_method` varchar(80),
    `response` varchar(80),
    PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
	'connector' = 'cmq', 											--必须为 'cmq'
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com',	--cmq所在地域的nameServer
    'queue' = 'sink_queue',											--cmq的队列名
    'secret-id' = 'Your SecretId',			                                --账号secretId
    'secret-key' = 'Your SecretKey',                                          --账号secretKey
    'sign-method' = 'HmacSHA1',                                     --签名的方式
    'format' = 'csv',                                               --定义数据格式(JSON 格式)
    'batch-size' = '16',                                            --批量消费消息的个数/批量发送消息的个数
    'request-timeout' = '5000ms',                                   --请求的超时时间
    'polling-wait-timeout'= '10s',                                  --source参数; 获取不到数据情况下的等待时间
    'key-alive-timeout'= '5min',                                    --source参数;含primary key的消息,CMQ去重的有效时间
    'retry-times' = '3',                                            --sink参数;发送消息的重试次数
    'max-block-timeout' = '0s'                                      --sink参数;批量发送数据的最大等待时间
);

insert into CMQSinkTable select * from CMQSourceTable;

2.3 算子操作

这里只做最简单的数据插入。

代码语言:javascript复制
insert into CMQSinkTable select *from CMQSourceTable;

3. 验证总结

在 CMQ 控制台往名为test的主题中发送消息,可在sink_queue的队列中接收到消息。

发送主题消息发送主题消息
接收队列消息接收队列消息

原文链接:https://cloud.tencent.com/developer/article/1857665

0 人点赞