Flink Connector MongoDB CDC实现原理

2021-08-27 09:18:50 浏览数 (1)

1. CDC概述

CDC全称是Change Data Capture,我们通常将能够捕获数据变更的技术称为CDC。目前通常描述的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据的变更技术。CDC的技术应用场景有数据同步、数据分发、数据集成等。

2. Debezium介绍

Debezium架构Debezium架构

3. Flink SQL CDC原理介绍

Flink SQL CDC内置了Debezium引擎驱动相关Debezium source connector,利用其抽取日志获取变更的能力,将Debezium引擎获取的对应的数据库变更数据(SourceRecord)转换为Flink SQL认识的RowData数据,发送给下游,于是Flink提供了一种Changelog Json format。

Changelog StreamChangelog Stream

Flink提供的Changelog Json format我们可以简单的理解为Flink对进来的RowData数据进行了一层包装,然后增加了一个操作类型。例如,原始数据格式是这样的:

代码语言:javascript复制
{
     "id": 1004,
     "name": "Anne"
}

经过Changlog格式的加工之后,就会变为下面的格式:

代码语言:javascript复制
{
     "data": {
          "id": 1004,
          "name": "Anne"
     },
     "op": " I"
}

4. Flink connector mongodb cdc原理

利用Debezium Embeded Engine驱动MongoDB Kafka Connector。MongoDB Kafka Connector是MongoDB官方提供的一个Kafka Connector实现,通过订阅ChangeStreamEvent来实现变更数据订阅。

4.1 Change Stream & Tailing oplog

MongoDB在3.6以前只能通过不断tailing oplog的方式来拉取增量的oplog获取CDC数据,手动设置过滤条件,自己管理断点续传等问题。MongoDB从3.6版本开始推出了Change Stream的功能,提供实时的增量数据流功能。在使用watch开始监听整个数据库/collection之后,一旦有符合条件的变更,Change Stream将会推送出一条event代表一次变更(插入/删除/修改)。每个Change Stream Event都包括一个ResumeToken用于断点续传。

对比项

Change Stream

Tailing Oplog

易用性

简单易用, API友好

使用门槛高,需要知道oplog的各种格式变化

故障恢复

简单,内核进行统一的进度管理,通过resumeToken实现故障恢复

相对复杂,需要自行管理增量续传,故障时需要记录上次拉去的oplog的ts字段转换为下一次的查询过滤器

update事件

支持返回全文档,指定fullDocument即可

不支持返回全文档, 对于update操作需要根据oplog中的_id再次查询得到全文档

分片集群适配

直接发起change stream即可订阅整个集群,并且是全局有序的

需要针对每个分片单独建立拉取进程

持久化

返回的每个event都是已提交到大多数节点的,遇到主从切换的场景也可以保证数据的持久化

无法保证oplog已提交到大多数节点

安全性

用户只能在已授权访问的db上订阅变更

需要local库的读权限

4.2 MongoDB Kafka Connector

Debezium Connector for MongoDB就是基于oplog的方式实现的。MongoDB的oplog中UPDATE事件并没有保留变更之前的数据状态,仅保留了变更字段的信息,无法将MongoDB变更记录转换成Flink标准的变更流( I -U U -D)。只能将其转换为Upsert流( I U -D),经过一次ChangelogNormalize转换成标准的变更流。Update After的变更记录需要变更后完整的RowData,而Debezium原生Connector采用dump oplog的方式,并不能很好支持。

MongoDB官方提供的 Kafka Connector采用ChangeStreamEvent的订阅方式,可以开启FullDocument配置,采集该行记录的最新的完整信息。

如果配置MongoDB Kafka connector的copy-existing=true则会启动MongoSoureTask复制库中原有数据(在Debezium中称之为数据库SnapShot阶段):

代码语言:javascript复制
 * <ol>
 *   <li>Get the latest resumeToken from MongoDB
 *   <li>Create insert events for all configured namespaces using multiple threads. This step is
 *       completed only after <em>all</em> collections are successfully copied.
 *   <li>Start a change stream cursor from the saved resumeToken
 * </ol>

如果在复制期间对数据有更改,会在数据复制完成后应用更改。数据拷贝与虽有的数据可能有重复时间,因为在拷贝期间,客户端可能会对mongodb中的数据进行修改,但是因为数据更改时间流是幂等的,所以可以保证一致性。

如果没有配置copy-existing=true则只会watch到任务启动开始之后数据库的Change Event。

4.3 MongoSourceTask代理(MongoDBConnectorSourceTask)

由于DebeziumSourceFunction实现了CheckpointedFunction,因此每隔一段时间就会进行Checkpoint,从而保证Flink任务的Extractly Once语义,然而如果处于数据库Snapshot阶段,一般是没有offset可以用来checkpoint的,所以此阶段需要阻止Flink的检查点(Checkpoint)生成。

那么我们如何知道数据库Sanpshot阶段已经结束,因此用MongoDBConnectorSourceTask代理了之前的MongoSourceTask,在poll方法中会暂时保留最后一条SnapshotRecord,通过设置SnapshotRecord字段为Last来标记Snapshot阶段结束的。

在以下两种场景会退出Snapshot阶段:

  1. 收到Change event(non-snapshot record)
  2. 没收到Change event且被代理的MongoSourceTask.isCopying标志为false

0 人点赞