1. CDC概述
CDC全称是Change Data Capture,我们通常将能够捕获数据变更的技术称为CDC。目前通常描述的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据的变更技术。CDC的技术应用场景有数据同步、数据分发、数据集成等。
2. Debezium介绍
3. Flink SQL CDC原理介绍
Flink SQL CDC内置了Debezium引擎驱动相关Debezium source connector,利用其抽取日志获取变更的能力,将Debezium引擎获取的对应的数据库变更数据(SourceRecord)转换为Flink SQL认识的RowData数据,发送给下游,于是Flink提供了一种Changelog Json format。
Flink提供的Changelog Json format我们可以简单的理解为Flink对进来的RowData数据进行了一层包装,然后增加了一个操作类型。例如,原始数据格式是这样的:
代码语言:javascript复制{
"id": 1004,
"name": "Anne"
}
经过Changlog格式的加工之后,就会变为下面的格式:
代码语言:javascript复制{
"data": {
"id": 1004,
"name": "Anne"
},
"op": " I"
}
4. Flink connector mongodb cdc原理
利用Debezium Embeded Engine驱动MongoDB Kafka Connector。MongoDB Kafka Connector是MongoDB官方提供的一个Kafka Connector实现,通过订阅ChangeStreamEvent来实现变更数据订阅。
4.1 Change Stream & Tailing oplog
MongoDB在3.6以前只能通过不断tailing oplog的方式来拉取增量的oplog获取CDC数据,手动设置过滤条件,自己管理断点续传等问题。MongoDB从3.6版本开始推出了Change Stream的功能,提供实时的增量数据流功能。在使用watch开始监听整个数据库/collection之后,一旦有符合条件的变更,Change Stream将会推送出一条event代表一次变更(插入/删除/修改)。每个Change Stream Event都包括一个ResumeToken用于断点续传。
对比项 | Change Stream | Tailing Oplog |
---|---|---|
易用性 | 简单易用, API友好 | 使用门槛高,需要知道oplog的各种格式变化 |
故障恢复 | 简单,内核进行统一的进度管理,通过resumeToken实现故障恢复 | 相对复杂,需要自行管理增量续传,故障时需要记录上次拉去的oplog的ts字段转换为下一次的查询过滤器 |
update事件 | 支持返回全文档,指定fullDocument即可 | 不支持返回全文档, 对于update操作需要根据oplog中的_id再次查询得到全文档 |
分片集群适配 | 直接发起change stream即可订阅整个集群,并且是全局有序的 | 需要针对每个分片单独建立拉取进程 |
持久化 | 返回的每个event都是已提交到大多数节点的,遇到主从切换的场景也可以保证数据的持久化 | 无法保证oplog已提交到大多数节点 |
安全性 | 用户只能在已授权访问的db上订阅变更 | 需要local库的读权限 |
4.2 MongoDB Kafka Connector
Debezium Connector for MongoDB就是基于oplog的方式实现的。MongoDB的oplog中UPDATE事件并没有保留变更之前的数据状态,仅保留了变更字段的信息,无法将MongoDB变更记录转换成Flink标准的变更流( I -U U -D)。只能将其转换为Upsert流( I U -D),经过一次ChangelogNormalize转换成标准的变更流。Update After的变更记录需要变更后完整的RowData,而Debezium原生Connector采用dump oplog的方式,并不能很好支持。
MongoDB官方提供的 Kafka Connector采用ChangeStreamEvent的订阅方式,可以开启FullDocument配置,采集该行记录的最新的完整信息。
如果配置MongoDB Kafka connector的copy-existing=true则会启动MongoSoureTask复制库中原有数据(在Debezium中称之为数据库SnapShot阶段):
代码语言:javascript复制 * <ol>
* <li>Get the latest resumeToken from MongoDB
* <li>Create insert events for all configured namespaces using multiple threads. This step is
* completed only after <em>all</em> collections are successfully copied.
* <li>Start a change stream cursor from the saved resumeToken
* </ol>
如果在复制期间对数据有更改,会在数据复制完成后应用更改。数据拷贝与虽有的数据可能有重复时间,因为在拷贝期间,客户端可能会对mongodb中的数据进行修改,但是因为数据更改时间流是幂等的,所以可以保证一致性。
如果没有配置copy-existing=true则只会watch到任务启动开始之后数据库的Change Event。
4.3 MongoSourceTask代理(MongoDBConnectorSourceTask)
由于DebeziumSourceFunction实现了CheckpointedFunction,因此每隔一段时间就会进行Checkpoint,从而保证Flink任务的Extractly Once语义,然而如果处于数据库Snapshot阶段,一般是没有offset可以用来checkpoint的,所以此阶段需要阻止Flink的检查点(Checkpoint)生成。
那么我们如何知道数据库Sanpshot阶段已经结束,因此用MongoDBConnectorSourceTask代理了之前的MongoSourceTask,在poll方法中会暂时保留最后一条SnapshotRecord,通过设置SnapshotRecord字段为Last来标记Snapshot阶段结束的。
在以下两种场景会退出Snapshot阶段:
- 收到Change event(non-snapshot record)
- 没收到Change event且被代理的MongoSourceTask.isCopying标志为false