实时即未来,最近在腾讯云流计算 Oceanus 进行Flink实时计算服务,以下为 Flink 消费腾讯云 CMQ 的数据实践。原文自Raigor,已获得授权,分享给大家~
Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。
1. 环境搭建
1.1 创建 Oceanus 集群
在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
若之前未使用过VPC,日志,存储这些组件,需要先进行创建。
创建完后的集群如下:
1.2 新建 CMQ 主题
在 CMQ 控制台的【主题订阅】-> 【新建】主题,输入主题名,其他保持默认值即可。新建的主题如下:
1.3 新建 CMQ 队列
在 CMQ 控制台的【队列】-> 【新建】主题,输入队列名称、消息生命周期、堆积消息数量上限,其他保持默认值即可。我们这里新建两个队列,其中一个用来订阅 CMQ 主题模型数据,另一个用作 Oceanus 作业的目的表。新建的主题如下:
1.4 新建 CMQ 主题订阅
在 CMQ 主题列表页,点击主题操作列的【订阅】链接,进入【订阅者】列表,新建订阅,输入订阅名,终端类型选择 Queue 队列服务,订阅地址选择cs2的队列,其他保持默认值。新建的订阅者如下:
2. 作业创建
2.1 创建 SQL 作业
在 Oceanus 控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector cmq-1.1.1。
2.2 创建数据源表和目的表
在作业的【开发调试】->【插入模板】选择 CMQ 读取 & 写入的模板,并添加。修改参数queue、secret-id、secret-key。
注意:强烈建议使用具有最小权限的secret-id和secret-key,并注意保密,防止泄漏带来的安全风险。
代码语言:txt复制CREATE TABLE `CMQSourceTable` (
`id` bigint,
`request_method` varchar(80),
`response` varchar(80),
PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
'connector' = 'cmq', --必须为 'cmq'
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', --cmq所在地域的nameServer
'queue' = 'cs2', --cmq的队列名
'secret-id' = 'Your SecretId', --账号secretId
'secret-key' = 'Your SecretKey', --账号secretKey
'sign-method' = 'HmacSHA1', --签名的方式
'format' = 'csv', --定义数据格式(JSON 格式)
'batch-size' = '16', --批量消费消息的个数/批量发送消息的个数
'request-timeout' = '5000ms', --请求的超时时间
'polling-wait-timeout'= '10s', --source参数; 获取不到数据情况下的等待时间
'key-alive-timeout'= '5min', --source参数;含primary key的消息,CMQ去重的有效时间
'retry-times' = '3', --sink参数;发送消息的重试次数
'max-block-timeout' = '0s' --sink参数;批量发送数据的最大等待时间
);
CREATE TABLE `CMQSinkTable` (
`id` bigint,
`request_method` varchar(80),
`response` varchar(80),
PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
'connector' = 'cmq', --必须为 'cmq'
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', --cmq所在地域的nameServer
'queue' = 'sink_queue', --cmq的队列名
'secret-id' = 'Your SecretId', --账号secretId
'secret-key' = 'Your SecretKey', --账号secretKey
'sign-method' = 'HmacSHA1', --签名的方式
'format' = 'csv', --定义数据格式(JSON 格式)
'batch-size' = '16', --批量消费消息的个数/批量发送消息的个数
'request-timeout' = '5000ms', --请求的超时时间
'polling-wait-timeout'= '10s', --source参数; 获取不到数据情况下的等待时间
'key-alive-timeout'= '5min', --source参数;含primary key的消息,CMQ去重的有效时间
'retry-times' = '3', --sink参数;发送消息的重试次数
'max-block-timeout' = '0s' --sink参数;批量发送数据的最大等待时间
);
insert into CMQSinkTable select * from CMQSourceTable;
2.3 算子操作
这里只做最简单的数据插入。
代码语言:txt复制insert into CMQSinkTable select *from CMQSourceTable;
3. 验证总结
在 CMQ 控制台往名为test的主题中发送消息,可在sink_queue的队列中接收到消息。