导语:Change Stream是MongoDB自3.6版本就推出的功能,顾名思义,“变更流”可以对数据库建立一个监听(订阅)进程,一旦数据库发生变更,使用change stream的客户端都可以收到相应的通知。使用场景包括多个MongoDB集群之间的增量数据同步、高风险操作审计(删库删表)、将MongoDB的变更订阅到其他关联系统实现离线分析/计算等等。本文作为系列文章的第一篇,尝试简要介绍一下change stream以及实践。(未特殊说明,文中内容均基于MongoDB4.0.3版本)
一、什么是Change Stream?
Change Stream可以直译为"变更流",也就是说会将数据库中的所有变更以流式的方式呈现出来。用户可以很方便地对数据库建立一个监听(订阅)进程,一旦数据库发生变更,使用change stream的客户端都可以收到相应的通知。使用场景可以包括但不限于以下几种:
1)多个MongoDB集群之间的增量数据同步;
2)高风险操作的审计(删库删表);
3)将MongoDB的变更订阅到其他关联系统实现离线分析/计算等等;
以下是一些change stream的基本特征:
- change stream对于副本集和分片集群都可用。副本集时,可以在副本集中任意一个成员上建立监听流;分片集群时则只能在mongos上建立监听流。
- 使用条件:1)WT引擎;2)副本集协议为
pv1
;3)4.0及以前的版本,要求支持readConcern为“majority”
。 - 粒度可调整,可选择配置在单个表、单个库或者整个集群上。但是无法配置为
admin/local/config
库或者system.xxx
表。 - 4.0以后的版本可以指定
startAtOperationTime
来表示在某个特定的时间开始监听change Stream。但是要求给定的时间点必须在所选择节点的有效oplog时间范围中。
二、MongoDB Change Stream演进过程
v3.6版本:
- 初期版本,仅支持collection维度的订阅,仅支持insert/update/replace/delete4种事件;
- 支持故障恢复;支持update操作的全文档查看;
v4.0版本:
- 支持更粗粒度的change stream订阅,比如database/集群粒度;
- resumeToken的类型从
BinData
变为十六进制编码的字符串; - 新增了对drop/rename/dropDatabase事件的支持;
v4.2版本:
- 加入了更多pipeline管道操作符的支持,比如
$replaceWith
、$set
、$unset
; - 如果pipeline修改了一个change events的
_id
,将会报错; - 新增了
startAfter
选项,可以开始一个新的change stream监听,与之前的resumeAfter
互斥; - 使用change stream不再需要指定
read concern
为majority
;
v4.4版本(最新):暂时未见优化
三、Change Stream初体验
3.1 mongo shell
操作顺序:
1.第一个会话建立change stream
代码语言:txt复制// 指定为监听某个db的修改,最多阻塞等待1分钟;
// 如果是WT4.0版本,可选择监听整个集群或者coll维度,分别为db.getMongo().watch()和db.coll.watch()
>db.watch([],{maxAwaitTimeMS:60000})
注意:上述命令会阻塞整个会话,直到1分钟或者有相应的change event产生。如果不希望阻塞shell的话可以采用显示生成游标的方式: cursor = db.changestream.watch([],{maxAwaitTimeMS:60000})cursor.next()....
2.第二个会话做CURD操作
代码语言:txt复制>db.test.insert({a:2})
>db.test.update({a:2}, {$set:{"msg":"hello world"}})
>db.test.insert({a:1})
>db.test.remove({a:1})
// DDL:rename/drop/dropDatabase
>db.test.renameCollection("test1")
>db.test1.drop()
>db.dropDatabase()
3.回到第一个会话观察监听结果
代码语言:txt复制{ "_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" }, "operationType" : "insert", "clusterTime" : Timestamp(1595239231, 2), "fullDocument" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83"), "a" : 2 }, "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83") } }
{ "_id" : { "_data" : "825F156B5F0000000129295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" }, "operationType" : "update", "clusterTime" : Timestamp(1595239263, 1), "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83") }, "updateDescription" : { "updatedFields" : { "msg" : "hello world" }, "removedFields" : [ ] } }
{ "_id" : { "_data" : "825F156B640000000129295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B640DE1FAAEF1B3DF840004" }, "operationType" : "insert", "clusterTime" : Timestamp(1595239268, 1), "fullDocument" : { "_id" : ObjectId("5f156b640de1faaef1b3df84"), "a" : 1 }, "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b640de1faaef1b3df84") } }
{ "_id" : { "_data" : "825F156B6B0000000129295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B640DE1FAAEF1B3DF840004" }, "operationType" : "delete", "clusterTime" : Timestamp(1595239275, 1), "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b640de1faaef1b3df84") } }
{ "_id" : { "_data" : "825F156B960000000129295A1004C982483732384D28AE57C6500C6018BF04" }, "operationType" : "rename", "clusterTime" : Timestamp(1595239318, 1), "ns" : { "db" : "phoenix", "coll" : "test" }, "to" : { "db" : "phoenix", "coll" : "test1" }, }
{ "_id" : { "_data" : "825F156BC60000000129295A1004C982483732384D28AE57C6500C6018BF04" }, "operationType" : "drop", "clusterTime" : Timestamp(1595239366, 1), "ns" : { "db" : "phoenix", "coll" : "test1" } }{ "_id" : { "_data" : "825F156BD200000001292904" }, "operationType" : "dropDatabase", "clusterTime" : Timestamp(1595239378, 1), "ns" : { "db" : "phoenix" } }
//由于使用的是db维度的watch,没法watch整个db的dropDatabase操作,因此被视为invalidate事件。
{ "_id" : { "_data" : "825F156BD200000001292904" }, "operationType" : "invalidate", "clusterTime" : Timestamp(1595239378, 1) }// 如果是WT3.6版本的话,rename执行完之后游标就中止了。这是因为在WT3.6的版本中还不支持rename事件。{ "_id" : { "_data" : BinData(0,"gl8NTwAAAAABFFoQBId ETlGUEjUgfPVNXJ4WSsE") }, "operationType" : "invalidate" }
// 如果继续执行"it"会提示如下信息,表示changeStream生成的cursor确实已经中止了。
no cursor
4.意外中止时的恢复
由于某些原因,我们还没消费已获取的change event,那么可以通过指定resumeAfter
来恢复对该changeStream的订阅。其中resumeAfter
就是change stream结果里的_id
字段(即resume token,唯一标志一个change stream流中的位置)
//我们指定第一条,就可以获得后面的两条
>db.changestream.watch([],{maxAwaitTimeMS:60000, resumeAfter:{"_data":"825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004"}})
结果:
3.2 mongo-driver
只有官方驱动才支持change stream,使用诸如mgo等旧的第三方驱动是无法使用的。详情可以查看自己所使用的驱动版本及README
文件。
这里以mongo-driver go版本为例,API使用非常简单:
代码语言:txt复制// Start Changestream Example 4
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
require.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current
// End Changestream Example 4
其他语言的版本可以参考open a change stream
四、Change Stream VS. Tailing Oplog
在Change Stream
功能出现以前,我们只能通过不断tailing oplog
的方式来拉取增量的oplog。两种方式的对比如下:
对比项 | Change Stream | Tailing Oplog |
---|---|---|
易用性 | 简单易用,API友好 | 使用门槛高,需要知道oplog的各种格式变化,而且在对oplog不断拉取过程中需要使用 |
故障恢复 | 简单,内核进行统一的进度管理,通过resumeToken API实现故障恢复 | 相对复杂,需要自行管理增量续传,故障时需要记录上一次拉取的oplog的ts字段并转换为下一次查询的过滤器 |
结果过滤 | 支持多个维度(集群/库/集合)以及在server端的pipeline过滤,减少网络传输 | 只能在拉取的client端过滤,而且过滤必须进行反序列化操作,带来一定的CPU和网络传输消耗 |
update返回全文档 | 支持,指定 | 不支持,对于 |
分片集群适配 | 直接在mongos发起change stream即可订阅整个集群维度的变更,并且是全局有序的 | 需要针对每个分片单独建立拉取进程,而且可能乱序 |
持久化 | 返回的每一个event都是已提交到大多数的,遇到主节点切换的场景也可以保证数据的持久化 | 无法保证oplog已提交到大多数节点 |
安全性 | 用户只能在已授权访问的db上订阅变更 | 需要有local库的读权限 |
性能 | 类似 | 类似 |
可以看到change stream
在各个方面都要优于tailing oplog
。但是注意到,副本集内节点间主从同步使用的依然是tailing oplog
的方式。为什么呢?各位可以思考一下,欢迎评论区留言。
五、待优化
遗憾的是,当前版本(以及官方4.4版本)的change stream功能依然存在一些待优化的地方,比如:
1.部分DDL操作不支持,比如create/createIndex/dropIndex/convertToCapped/collMod/emptycapped/
,无法覆盖到所有变更;(这也是最主要的问题)
其中
convertToCapped
会变成一个非预期的rename event,并且size字段会丢失。
2.如果将fullDocument
设置为"updateLookup"
时,会获取到已提交到大多数节点的已更新全文档版本,change stream中是通过update操作中的_id
来查找到文档当前内容。但是对同一文档短时间内频繁更新时,change stream收到的fullDocument
内容可能已经被后续的修改覆盖。换句话说,这里的fullDocument
中内容并不是point-in-time
的。
3.对于分片集群的change stream需要将订阅建立在mongos上,为了保证全局有序的变更流结果,从各个分片返回的结果需要在mongos侧按时间戳进行排序和聚合处理。一方面在写入量比较大的场景下,change stream的性能存在瓶颈;另一方面如果分片间写入不均(比如不合理的范围分区分片键导致写入都落到单个分片),会导致change stream的返回延迟大幅增加。
4.所有change stream的返回文档也受到 16MB的文档大小限制,考虑到指定了fullDocument
选项会将全文档内容包含在返回文档内,可能会导致变更流返回失败。当然,如果文档本身的大小已经接近16MB。对其的insert/update操作同样也会导致变更流返回失败。
5.分片集群的变更流会将用户自定义的pipeline阶段(比如$match
,$project
等)放在mongos上执行,导致mongos到mongod之间的网络流量较大。其实这里可以考虑将一部分pipeline操作符下放到mongod上去执行,减少mongos侧合并排序的压力,提升change stream的性能。
6.对事务的支持能力尚有欠缺,尽管change event里面有lsid
字段来标明所在的transaction,但并不知道某个事件是否为事务中的最后一个操作,也不知道该事务的提交状态。
7.不支持对config库的订阅。但是有部分开发者认为订阅分片集群的元数据状态也是一种合理的变更流使用场景,比如希望知道集群中何时触发了chunk split,何时触发了balancer等等。
至于为何官方没有实现对create/createIndex/dropIndex
等DDL的支持,猜测可能是最初设计时考虑欠缺(比如drop/rename/dropDatabase
等是4.0以版本后才支持的)
在JIRA上可以看到相关的提问:
mongodb create/delete index should trigger change stream
镜像stackoverflow提问:
mongodb create/delete index won't trigger change stream
官方在2月末回应将转给相关的开发团队,但截至笔者撰写此文时,仍然未见对这些DDL操作的支持。
六、总结
- Change Stream提供了简单而强大的订阅集群中修改的能力。
- 对部分DDL操作仍然不支持。
- 对分片集群的订阅在写入量很高的情况下,由于为了保证全局有序的排序聚合阶段的存在,性能可能存在瓶颈。
- Change Stream的性能相较于Tailing oplogs差不多。
- 同一个集群上开启不同维度的多个change stream,将不可避免地对源集群产生性能影响。
备注
MongoDB中的DDL
DDL的概念来自关系型数据库的核心语言——SQL(Structure Query Language)
。按照定义,它分为四类:
- 数据定义语言DDL
- 数据操纵语言DML
- 数据查询语言DQL
- 数据控制语言DCL
由于习惯的原因,在NoSQL数据库中也沿用了上面的说法,以DML和DDL为主,主要为了区分一般的insert/update/delete
和其他操作。
在MongoDB中,DDL包括以下几种(在oplog中,其"op"
字段为"c"
):
collMod
: 向集合添加选项或者修改视图定义,比如修改TTL、指定验证规则等create
: 创建集合createIndexes
: 创建索引convertToCapped
: 变为capped collectiondeleteIndex/deleteIndexes
: 删除索引dropIndex/dropIndexes
:删除索引drop
: 删除集合dropDatabase
: 删除数据库emptycapped
: 清空一个capped collectionrenameCollection
: 集合重命名applyOps
: 用于回放oplog
Change Events解析
从Change Streams中能监听到的变更事件,具体字段信息和含义请参考change events。
代码语言:txt复制{
_id : { // 存储元信息
"_data" : <BinData|hex string> // resumeToken,用于断点恢复
},
"operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate,部分仅支持4.0后的版本,详情见下
"fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update的事件中
"ns" : { // namespace
"db" : "<database>",
"coll" : "<collection"
},
"to" : { // 只在operationType为rename的时候有效,表示改名以后的namespace
"db" : "<database>",
"coll" : "<collection"
},
"documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update事件中。正常只包含_id,对于sharded collection,还包括shard key
"updateDescription" : { // 只在operationType为update的时候出现,相当于是增量的修改,而replace是替换
"updatedFields" : { <document> }, // 更新的field的值
"removedFields" : [ "<field>", ... ] // 删除的field列表
},
"clusterTime" : <Timestamp>, // 相当于ts字段
"txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
"lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id
"id" : <UUID>,
"uid" : <BinData>
}
}
截止4.4版本,change events主要分为以下几种类型(也就是上面operationType
的取值范围):
- insert 事件
- update 事件
- replace 事件
- delete 事件
- drop 事件
- rename 事件
- dropDatabase 事件
- invalidate 事件
invalidate事件发生时将会关闭change stream的游标。需要使用resumeAfter
选项从invalidate事件之前的change events来恢复变更流。
Change Stream性能
根据下面这个jira SERVER-46979中官方的回复:
$changeStream
的原始读取速率(不可避免地)比对oplog的简单查询要慢。其中oplog的$match
和转换阶段是主要瓶颈,相较于基本查询而言,CPU消耗更高。但是在典型操作条件和写入工作负载下对$change stream
和oplog-Tailing
进行的测试中,两者的性能相似。
Change Stream
目前是串行执行的,即对每一个变更流只有一个线程来执行oplog的获取、过滤和转换工作。未来也不会考虑对每个单独的变更流进行多线程处理,而是考虑将多个不同的变更流工作分解并复用。