MongoDB Change Stream之三——应用场景及实践

2021-02-24 16:14:50 浏览数 (1)

前两篇文章分别介绍了change streams的『入门知识』以及『内核源码分析』,不过作为业务方可能更加关心的是应该如何去利用change streams的能力。本文作为change streams系列文章的最后一篇,尝试分享一些实际的应用场景以及实践。

一、Change Streams的使用场景

1.1 CDC(Change Data Capture)

change streams从本质上来说是提供了一种基于mongoDB的CDC(Change Data Capture)的解决方案。所谓的CDC就是变化数据捕获,简单理解为监听数据库系统的变更就好。下面的图中描述了CDC的典型场景,左边的是主数据库,不同的客户端可以向其中插入数据(有前后关系);中间是一个队列,这些数据变化都会被放到里面;右边是派生数据系统,消费队列里的变化,然后用作搜索和数据仓库等应用。市场上也不乏这种专门做CDC的产品,比如:HEVO,其宣称的优势包括:1)简单易上手,无需代码;2)良好的交互式用户界面;3)支持多种数据源;4)可容错的安全架构等。

CDC arch.pngCDC arch.png
hevo.pnghevo.png

业界也有很多基于不同产品的CDC数据同步方案,比如IBM的InfoSphere、Flink SQL等,但大都是基于关系型数据库(SQL)生态的。也难怪mongoDB要针对自己家的生态做出这一套CDC解决方案。

ibm infoSphere.jpgibm infoSphere.jpg
Flink SQL.pngFlink SQL.png

那么change streams的使用场景自然也能覆盖CDC的场景,简单整理一下主要包括:

  • 数据迁移/数据同步
  • (微服务)变化监听
  • 实时分析/实时通知
  • 事件驱动架构组件
  • 交互系统
user cases.pnguser cases.png

接下来仅举几个比较常见的应用场景例子。

1.2 数据迁移/数据同步

利用change streams,我们可以以更优雅的方式实现之前通过tailing oplog实现的事情。数据迁移/数据同步可以分为两个部分,全量 增量。change streams主要是在增量同步阶段发挥作用。

change stream之数据迁移&同步.pngchange stream之数据迁移&同步.png

在change streams功能推出以前,我们也是可以实现数据迁移/数据同步的,主要是利用了mongoDB的操作日志——oplog以及其幂等性保证。我在之前的文章中将这两种方式做过对比,change streams在易用性、故障恢复、安全性、功能上都要优于tailing oplog,是一种更为优雅的数据迁移/同步解决方案。

有一个地方值得大家注意,之前通过tailing oplog的方式处理的是一条一条的oplog,因此在写入目标集群时需要自行实现过滤,而change streams可以直接在源集群以pipeline的方式过滤了,减少了中间的网络传输消耗;另外oplog在写入的时候是可以直接调用applyOps命令的,而change event是没办法直接应用到目标集群的,需要自行转换成对应的写入操作。考虑到change streams在其他方面的优势,这里的转换开销应该可以忽略不计。

有了change streams之后,原本需要依赖第三方工具实现的复杂多地多中心实时同步架构也就变得更加简单了,有效避免了引入第三方工具带来的额外维护/监控成本。当然业务方还是需要进行必要的转换以及故障恢复逻辑开发。

mongo shake多地多中心.pngmongo shake多地多中心.png

Q:在跨机房同步场景中如何避免环形复制?

A:主要有两个思路: 1)保证单向同步,如果change stream的数据流是单向的,也就不会存在环形复制; 2)如果双向建立同步,则需要额外的机制来判断拿到的change event到底是哪个集群产生的。这里可操作的细节比较多,比如可以让两个集群操作不同的db,集群A操作db1,集群B操作db2,而集群A操作db1经过B集群再回到A集群的db1操作,直接跳过即可。也可以通过在更新doc时带上能唯一标识集群的id,A集群产生的变更只会被B集群处理,B集群产生的变更只会被A集群处理,而从A集群经过B集群再回到A集群的变更,由于id也是A,直接跳过即可。以上方案同样可推广到超过3个集群互相同步的场景。

1.3 变化监听/事件驱动

利用change stream,我们可以监听我们感兴趣的变化,并以该变化事件作为驱动去触发其他下游的事件。听起来很像传统关系型数据库的触发器对不对?没错,在这个版块里change stream就是实现类似触发器的功能。官方在change stream功能上线时就给出过一个还不错的例子,这里带大家一起简单回顾下。

假设我们经营一家小型杂货铺,卖一些水果、零食之类的。我们想构建一个智能库存系统,在每次缺货的时候主动通知我们。换句话说,我们想监听库存变化的信息,并且在物品数量过少时候通知我们或者向采购商下订单。库存系统里的每种商品存储在数据库中的结构可以简化成下面这样:

代码语言:txt复制
{	_id: 123UAWERXHZK4GYH
	product: pineapple
	quantity: 3
}

不同的product在库存系统中都有相同的quantity字段用于展示商品的剩余量。然后我们要创建一个change stream监听。因为这里只有一个库存表,我们可以将change stream建立在表维度上来避免对其他库表操作的关注。然后由于我们并不是希望商品每次数量更新的时候都通知我们,而是库存中某个商品的数量不足时再通知,因此我们要创建针对性的change stream过滤条件。在下面的例子中,我们只关心那些更新后数量小于10的商品:

代码语言:txt复制
const changeStream = collection.watch(
  [{
    $match: {
      $and: [
        { "updateDescription.updatedFields.quantity": { $lte: 10 } },
        { operationType: "update" }
      ]
    }
  }],
  {
    fullDocument: "updateLookup"
  }
);

在接收到这个缺货的变更通知后,需要做什么就是业务方自定义的了,比如我们可以发送一个缺货的告警邮件、给采购人员发送短信、在下游的订单分析系统中为该商品增加热度、临时关闭该商品的售卖窗口等等。总而言之,一切在之前需要循环查询确认的事件都可以通过改成由change stream监听来触发,进而实现事件驱动模型。

你可以在github找到与本案例相关的示例代码。

1.4 实时通知

利用change streams,我们可以比较方便地实现一个小型系统中的的信息实时通知,而不用自己去定义复杂的通信协议。这里要举的例子是TCB(Tencent CloudBase),在TCB的云数据库中推出了实时推送的功能,其本质上就是对change streams的二次封装。

image-20201229172513731.pngimage-20201229172513731.png

基于实时推送功能,我们可以在小游戏和小程序中实现即时通信、状态同步等很酷的功能,比如聊天室、视频弹幕、信息流提示、棋牌类对局更新等等。

chess demochess demo

事实上,非小程序云开发场景下,也可以利用change streams的能力去完成类似的事情。

二、 在库表回档中的实践

2.1 库表回档简介

回档是数据库中非常常见的需求,一般回档是按集群维度回档,即将一个集群的数据回档到指定的时间点,为了避免对源集群的影响,回档数据将写入临时实例。但是在集群数据量比较大(数TB)的情况下,回档自身的性能瓶颈逐渐凸现出来,因此大部分云服务商也提供了库表回档的能力。所谓的库表回档就是只回档部分库表,通过减少回档需要处理的数据量来提升回档的性能,它的现实依据就是:业务的数据本身就是分库表存储的,一般发生数据错乱或者误操作的时候都只会影响部分库表/文档而并非整个集群(PS:删库跑路除外)。

与备份类似,回档也是分为两部分的:基于全量数据的回档 基于增量数据的回档(在mongoDB中指备份的oplog)。另外库表回档是回档到原集群的,为了避免对原表数据的覆盖,会将数据写入临时的库/表中,如下图所示。大致流程为:

  1. 创建临时表collection1_bak
  2. collection1的全量备份数据写入到临时表;
  3. 在增量备份的oplog中过滤对collection1的操作,并回放临时表中,这里回放时需要考虑namespace替换的问题;
  4. 回放到指定的回档时间时,整个库表回档结束;

另外,由于需要对原集群进行写入,为了降低对业务自身访问数据库进行读/写操作的影响,库表回档应该有相关的限流操作。

restore vs.pngrestore vs.png

2.2 遇到了什么问题?

当表存在着诸如create/rename等DDL操作时,在基于增量备份的回档中会导致回档出现问题。主要是两个:

  1. 在特定的回档时间点,用户库表的不可见问题
  2. 增量的oplog中存在复杂的DDL组合

不妨思考下面一种多次库表回档的场景:业务发现A表数据有问题,发起一次库表回档A-->A_bak,由于不想做业务侧变更(访问namespace改变),确认数据没问题后就删除了表A,然后将A_bak重命名为A继续使用,过了一段时间后又发现该表数据有问题,尝试发起第二次库表回档。其时间点和对应事件如下:

多次库表回档场景.png多次库表回档场景.png
  • T0时刻:最近的一次全量备份结束时间(图中以Snapshot表示);
  • T1时刻:发起一次库表回档A-->A_bak,此时会产生A_bak表的创建以及对A_bak的插入操作;
  • T2时刻:删除表A
  • T3时刻:将表A_bak重命名为A
  • T4时刻:第二次库表回档A->A_bak;
  • 另外,从T0到T4时间段都被增量备份完全覆盖。(图中以Incremental1Incremental2表示)

第一次回档时没有DDL操作因此没有问题,第二次回档由于需要回放从T0T4时间段的包含DDL的oplog,就不太好处理了。如果在回放时按照简单的替换策略db.A --> db.A_bak(原来对db.A的操作改写成对db.A_bak的操作),那么当T2时刻遇到drop A的操作被替换为drop A_bak后,最后回档出来的A_bak就完全丢失了T3之前所有对表A的操作,这是不可接受的。问题在于,在oplog回放的过程中我们遇到了drop原表的操作应该如何处理?

2.3 change streams实践

解决上述问题的方案之一就是利用change streams特性实现对集群所有DDL的监听并持久化到本地。

我们通过额外的方式对集群建立change streams监听流,并保证其自我恢复能力,将监听得到的DDL change events持久化。我们使用的聚合pipeline为:

代码语言:txt复制
pipeline := mongo.Pipeline{bson.D{
	{"$match", bson.D{
		{"$or",bson.A{
			bson.D{{"operationType", "drop"}},
			bson.D{{"operationType", "rename"}},
			bson.D{{"operationType", "dropDatabase"}},
		    bson.D{{"operationType", "create"}}, //官方版本暂不支持create事件
		}}},
}}}

然后在库表回档时首先通过change events列表寻找回档库表在指定的回档时间点的视图(表名的历史版本),并以它作为开始oplog回放的namesapce。然后每次遇到不同的DDL都要考虑是否需要更新我们的替换策略,如果需要的话则更新替换策略后再进行namesapce的替换。

不妨还是按上一章节举的例子来阐述:

change stream 库表回档.pngchange stream 库表回档.png

当用户发起到T4时刻的回档,我们首先需要知道在T4时刻该表的视图(是A),然后通过本地持久化的change events列表逆序查找该表的历史版本(是否由其他表rename而来),找到了在T3时刻的一个rename A_bak-->A的DDL事件,于是在从T0开始回放oplog时需要处理的处理的源namespace为db.A_bak,这正好与二次回档的目标表名A_bak相等,于是在替换规则中相当于没有发生替换。每次遇到DDL oplog的时候都需要判断其namespace是否处理,替换规则是否需要发生变更。在T1,T2时刻不需要处理,替换规则不变,在T3时刻遇到rename操作时,替换规则需要变更为db.A --> db.A_bak,这样在T3时刻后所有对A表的操作也都能被正常回放到新的临时表A_bak中了。

本方案还解决了前面提到了的库表不可见问题。在下面这样一种场景中,当选择的回档时间点不同时,其所能看到的库表视图也不一样,同一个表在T1时刻前是A,在(T1,T2)范围是B,在(T2,T3)范围是C,在T3时刻后是D。为了给用户提供良好的库表回档交互,当他选择任意的时间点都能看到当时的库表视图。

库表回档 多次rename.png库表回档 多次rename.png

实现方式为:1)首先在最近的全量备份中通过备份文件确认初始库表视图;2)然后以全量备份的时间T0在持久化的change events中进行遍历;3)如果发现了create/rename/drop等影响库表视图以及库表可见度的事件,就对视图进行相应的增/删/变更操作;4)直到遍历到用户指定的时间为止。如此一来就实现了『查看任意时间点数据库内库表视图』的能力,使得回档的交互更加友好和人性化。

遗憾的是,在3.6之前的版本由于不支持change streams,并不能享受到该方案带来的收益。

三、change streams优化

3.1 支持create/createIndex等更多DDL

腾讯云MongoDB目前已经率先在change streams中支持了包括create/createIndexes/dropIndexes等DDL操作。使得change streams的功能更加完善而全面。

支持的change event类型:

  • insert Event
  • update Event
  • replace Event
  • delete Event
  • drop Event
  • rename Event
  • dropDatabase Event
  • invalidate Event
  • create Event //新增,对应create collection操作
  • createIndexes Event //新增,对应create index操作
  • dropIndexes Event //新增,对应drop index操作
  • collMod Event //新增,对应collMod操作,比如修改TTL index和validator等
  • convertToCapped Event //新增,对应convertToCapped操作,将一个普通表转成capped collection

新增的change event类型一方面是为了满足我们实践中的需求,另一方面是为了完善change streams对DDL的支持。最后的几种DDL可能出现的频次比较低,但是覆盖到之后也能让我们第一时间感知到用户的不合理操作,便于排查数据库问题(比如由于创建索引未指定{background:true}而导致的数据库访问失败等等)。

未来我们也会考虑支持对config库的监听,毕竟应该有不少运维同事(JIRA)希望能看到集群什么时候发起了迁移,因为大量数据迁移可能会导致集群性能上不可避免的毛刺。

3.2 change streams性能优化

当前版本(包括官方推出的4.4)下,change streams的性能瓶颈主要有两个:

  1. oplog到change event的转换过程,涉及到BSON对象的序列化/反序列化,且不可避免。
  2. oplog的拉取过程,当源集群处于高并发写入的情况下,除了主从同步会受影响之外,change stream监听也会受影响,导致变更的实时性受到影响。

鉴于在我们的场景中,只需要关心所有的DDL操作,因此在oplogMatch阶段时完全不需要考虑那些一般的CURD操作。我们为change streams新增了一个{DDLOnly: true}选项,由于DDL本来就不属于高频操作,因此监听时源端只需要关注少量的DDL oplog操作,有效减少了后续阶段(如转换transform、检查合法性checkInvalidate等)需要处理的数据量,进而大幅提升change streams在关注DDL等类似场景中的性能。

3.3 官方优化

官方在JIRA上也提出了一些对change streams功能的优化思路:

  • 支持获取到update/replace/delete之前的文档视图
  • 多change streams共享stage以优化性能
  • 将部分用户自定义的stage下放到mongod去做
  • 支持对oplog被冲了但是事件很少的恢复

希望能在之后的新版本中看到这些优化,以及change streams被更广泛的使用在各种场景中。

四、总结

  • change streams提供了一种基于MongoDB的CDC解决方案。
  • change streams可以使用在诸多场景,包括数据迁移/同步、变化监听、实时通知、事件驱动等等。
  • 库表回档中可以配合change streams实现『查看任意时间点数据库内库表视图』的能力,优化回档的用户体验。

最后,欢迎体验和使用腾讯云MongoDB!

参考链接

  1. MongoDB Change Stream之一——上手及初体验
  2. MongoDB Change Stream之二——自顶向下流程剖析
  3. an-introduction-to-change-streams
  4. HEVO
  5. SERVER-36941、SERVER-46979、SERVER-48694、SERVER-35740

0 人点赞