作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将为您详细介绍如何取 MySQL 数据,经过流计算 Oceanus 实时计算引擎分析,输出数据到日志(Logger Sink)当中。
前置准备
创建 流计算 Oceanus 集群
进入流计算 Oceanus 控制台(https://console.cloud.tencent.com/oceanus/overview),点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档创建独享集群(https://cloud.tencent.com/document/product/849/48298)。
创建 Mysql 实例
进入MySQL 控制台(https://console.cloud.tencent.com/cdb),点击【新建】。具体可参考官方文档创建 MySQL 实例(https://cloud.tencent.com/document/product/236/46433)。然后在【数据库管理】> 【参数设置】中设置参数 binlog_row_image=FULL,便于使用 CDC(Capture Data Change)特性,实现数据的变更实时捕获。
!创建流计算 Oceanus 集群和 MySQL 实例时所选 VPC 必须是同一 VPC。
流计算 Oceanus 作业
1. 创建 Source
代码语言:javascript复制CREATE TABLE `MySQLSourceTable` (
`id` INT,
`name` VARCHAR,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'mysql-cdc', -- 必须为 'mysql-cdc'
'hostname' = '10.0.0.158', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
'password' = 'xxxxxxxxxx', -- 数据库访问的密码
'database-name' = 'testdb', -- 需要同步的数据库
'table-name' = 'student' -- 需要同步的数据表名
);
2. 创建 Sink
代码语言:javascript复制CREATE TABLE CustomSink (
id INT,
name VARCHAR
) WITH (
'connector' = 'logger',
'print-identifier' = 'DebugData'
);
3. 编写业务 SQL
代码语言:javascript复制INSERT INTO CustomSink
SELECT * FROM MySQLSourceTable;
4. 运行作业
点击【保存】>【发布草稿】运行作业。查看 Flink UI Taskmanger 日志,观察全量数据是否正常打印到日志。
5. 验证 MySQL-CDC 特性
在 MySQL 中新增一条数据,然后在 Flink UI Taskmanger 日志中观察结果,观察新增的数据是否正常打印到日志。
在 MySQL 中修改和删除记录同样会更新到 Logger Sink中,并打印输出。
总结
1、Mysql CDC 支持对 MySQL 数据库的全量和增量读取,并保证 Exactly Once 语义。MySQL CDC 底层使用了 Debezium 来做 CDC(Change Data Capture),其工作特性可参考数据库 MySQL CDC(https://cloud.tencent.com/document/product/849/52698)。
2、输入到 Logger Sink 的数据, 会通过日志打印出来,便于调试。Logger Jar 包下载地址:https://cloud.tencent.com/document/product/849/58713
点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~
腾讯云大数据
长按二维码 关注我们