实时即未来,最近在腾讯云Oceanus进行实时计算服务,以下为mysql到flink到ES实践。分享给大家~
1. 环境搭建
1.1 创建Oceanus集群
在Oceanus控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
若之前未使用过VPC,日志,存储这些组件,需要先进行创建。
VPC及子网需要和下面的Mysql、ES集群使用同一个,否则需要手动打通(如对等连接)。
创建完后的集群如下:
1.2 创建Mysql集群
在腾讯云主页【产品】->【数据库】->【云数据库 MySQL】页面购买Mysql集群。
在MySQL控制台找到创建的MySQL集群,在【数据库管理】->【参数设置】页面修改如下参数:
代码语言:txt复制 binlog_row_image=FULL
1.3 在mysql数据库中创建表:
执行如下sql,或通过可视化页面创建表。
代码语言:txt复制 -- 以学生成绩表为例
CREATE TABLE `cdc_source4es` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '学号',
`score` int(11) NOT NULL COMMENT '分数',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='create for student score'
1.4 创建Elastic Search集群
在腾讯云主页【产品】->【大数据】->【ElasticSearch】页面购买ES集群,这里为了简单,选择了与Oceanus同一个地域,同可用区。网络选择也选择与上面同样的VPC。
本次创建了1个ES6版本的集群,通过ES控制台查看,创建完后的集群如下:
创建之后可通过Kibana查看ES集群信息。如在Dev Tools面板上执行如下命令:
代码语言:txt复制# 查看集群节点
GET _cat/nodes
代码语言:txt复制# 返回节点信息则为正常
172.28.1.1 43 99 1 0.06 0.06 0.12 dilm - 1627027760001130832
172.28.1.2 65 99 3 0.03 0.12 0.13 dilm - 1627027760001130732
172.28.1.3 29 99 3 0.08 0.08 0.12 dilm * 1627027760001130632
注:ES中无需提前创建类似表的实体。
至此,环境准备完毕。
2. 作业创建
2.1 创建SQL作业
在Oceanus控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector,如mysql-cdc connector、elasticsearch6/7 connector。
注:es connector版本要与购买的ES组件版本一致。
2.2 创建Source端
此处选择mysql作为数据源,并将后续的数据持续更新到ES中。
代码语言:txt复制-- mysql-cdc connector
CREATE TABLE `mysql_source` (
`id` int,
`score` int,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'mysql-cdc', -- 必须为 'mysql-cdc'
'hostname' = '172.28.28.213', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'youruser', -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
'password' = 'yourpassword', -- 数据库访问的密码
'database-name' = 'test', -- 需要同步的数据库
'table-name' = 'cdc_source4es' -- 需要同步的数据表名
);
2.3 创建Sink端
此处sink无需在ES集群中提前做初始化,可直接写入数据。
代码语言:txt复制-- 注意! 如果您启用了 Elasticsearch 的用户名密码鉴权功能, 目前只能使用 Flink 1.10 的旧语法。若无需鉴权, 则可以使用 Flink 1.11 的新语法。
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
CREATE TABLE es_old_sink (
`id` INT,
`score` INT
) WITH (
'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch
'connector.version' = '6', -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致
'connector.hosts' = 'http://172.28.1.175:9200', -- Elasticsearch 的连接地址
'connector.index' = 'connector-test-index', -- Elasticsearch 的 Index 名
'connector.document-type' = '_doc', -- Elasticsearch 的 Document 类型
'connector.username' = 'elastic', -- 可选参数: Elasticsearch 用户名
'connector.password' = 'yourpassword', -- 可选参数: Elasticsearch 密码
'update-mode' = 'upsert', -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式
'connector.key-delimiter' = '$', -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
'connector.key-null-literal' = 'n/a', -- 主键为 null 时的替代字符串,默认是 'null'
'connector.failure-handler' = 'retry-rejected', -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)
'connector.flush-on-checkpoint' = 'true', -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
'connector.bulk-flush.max-actions' = '42', -- 可选参数, 每批次最多的条数
'connector.bulk-flush.max-size' = '42 mb', -- 可选参数, 每批次的累计最大大小 (只支持 mb)
'connector.bulk-flush.interval' = '60000', -- 可选参数, 批量写入的间隔 (ms)
'connector.connection-max-retry-timeout' = '1000', -- 每次请求的最大超时时间 (ms)
--'connector.connection-path-prefix' = '/v1' -- 可选字段, 每次请求时附加的路径前缀
'format.type' = 'json' -- 输出数据格式, 目前只支持 'json'
);
2.4 算子操作
此处只做了简单的数据插入,没有进行复杂计算。
代码语言:txt复制INSERT INTO es_old_sink select id, score from mysql_source;
3. 验证总结
在Kibana的Dev Tools中查询ES中的数据,数据是否插入成功。
代码语言:txt复制# 查询该索引下所有的数据
GET connector-test-index/_search