上一篇文章介绍了『change stream的上手及初体验』,相信大家都对之已经有了基本的了解。接下来,本篇文章尝试深入MongoDB的内核源码,来看看其内部原理以及实现细节。为了帮助大家更好地切入,采用了自顶向下(也就是从客户端-->驱动-->服务端)的方式来梳理整个流程。希望能对大家更深入了解MongoDB change stream功能有一定帮助。(未特殊说明,文中内容均基于MongoDB4.0版本代码)
一、原理&自顶向下流程
自顶向下流程的整体时序图如下:
事实上,所有的query基本也是这样一个流程,只是不同的命令会获得不同类型的cursor罢了。这里如果暂时不好理解的话,不妨把第一章内容浏览完再回过头来看看。
1.1 当我们使用change stream,得到了什么?
一个类似复制集协议中主从同步逻辑的,挂在节点**local.oplog.rs
**表上的tailable cursor。
Change Stream本质上是聚合命令中的一个特殊管道阶段(pipeline stage),由于它需要常驻在集群的节点上,因此会以tailable cursor
的形式出现。
什么是**tailable cursor
**?
按照官方的定义,它在概念上类似于Unix操作系统中提供的tail -f
命令,即当一个游标到达结果集的末尾之后,它也不会立即关闭,而是将继续等待新的数据产生,并在等到的时候将之返回。
注1:在change Stream功能出现以前,开发者想要实时感知MongoDB数据库的变化只能通过tailing oplog的方式,其实也是使用的
tailable cursor
。 注2:副本集内的主从同步也是用的这个,细节请参考之前的文章。
tailable cursor
不会使用到索引,因此建立tailable cursor
时的初始化扫描比较耗时。事实上,对于oplog这个capped collection
而言,如果某个时刻有较多的oplog产生,tailable cursor
也会因为需要逐条扫描而导致性能不佳。
要想确认change Stream的本质也很简单,在已经进行过变更流监听的节点上执行db.currentOp()
,然后就可以看到类似下面这样的一个command(仅截取部分字段,关键字段已注释):
{
"host" : "TENCENT64.site:7029",
...
"desc" : "conn14235866",
"connectionId" : 14235866, //该操作所在的链接id
"op" : "getmore", // 操作类型
"ns" : "admin.$cmd", // 指定的namespace,这里是集群维度的watch,因此为'admin.$cmd';如果是db维度的watch,则为'db.$cmd';collection维度的watch则为'db.collection'
"command" : { // 描述命令的基本信息
"getMore" : NumberLong("6717122891978962123"),
"collection" : "$cmd.aggregate", // 只有collection维度的watch才会显示表名
"$clusterTime" : { //逻辑时间戳
"clusterTime" : Timestamp(1605777729, 2),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"$db" : "admin", // 操作的db信息,这里是集群维度的watch,因此为'admin'
"$readPreference" : {"mode" : "primaryPreferred"}
},
"originatingCommand" : { //描述原始命令的详细信息
"aggregate" : 1, // 标识是一个聚合命令,collection维度的watch会展示所监听的表名
"pipeline" : [ // 该聚合命令的详细pipeline组成
{ //'$changeStream'一定在pipeline首位
"$changeStream" : {// change stream相关的参数
"allChangesForCluster" : true, // 标识是对整个集群的变更进行watch
"fullDocument" : "default" // 未指定update返回全文档,使用缺省值;需要的话可以指定为'updateLookup'
}
},
{ //其他自定义的aggregate操作符比如'$match'放在后面,需要注意的是change stream所支持的pipeline stages只是MongoDB所支持的子集,比如'$addFields/$replaceWith/$set'等
"$match" : {"$or" : [{"operationType" : "drop"}]}
}
],
"lsid" : {"id" : BinData(4,"5zBaeTrSR0qmVnKbEh3JOA==")},
"$db" : "admin", //同上
},
"planSummary" : "COLLSCAN", // 查询计划,由于cursor本质上是挂在oplog表的,没有索引
"numYields" : 125,
"waitingForLock" : false,
}
感兴趣的朋友还可以将这里的内容与附录中主从同步的tailable cursor做比较,会惊喜地发现以下异同:
- 都是
getMore
类型的op,因为都是cursor; - 都是全表扫描
COLLSCAN
,因为都是挂在oplog表上,而oplog表没有索引; - 主从同步的namespace固定为
local.oplog.rs
,Change Stream则是特定的namespace,在上面的示例中由于是对整个集群的watch,因此是admin.$cmd
; - 主从同步的原始命令是一个基本的query操作且指定了包括
tailable/oplogReplay/awaitData/maxTimeMS
等在内的一系列cursor参数,Change Stream则是一个aggregate操作,并且其pipeline中的第一个stage一定是$changeStream
,其他stage还包含用户自定义的条件;
1.2 当我们执行db.watch()
时,发生了什么?
当我们在某个mongos/mongod(以mongo shell或者driver的形式)上执行db.watch()
时,首先是驱动层会发送一个aggregate命令给相应的服务端(mongos/mongod)。这里为了让change Stream功能更加简单易用以及具备更好的灵活性,所有的驱动都要求做了一层封装并对外提供3个辅助方法(分别对应监听的三个维度)。在mongoDB官方的specifications我们可以看到其中的细节。以mongo-driver
的go语言版本为例:
然后接下来就会真正执行这个命令(调用RoundTrip()
),并将从服务端得到的batch cursor
封装一层返回。一切顺利的话,就能在mongos/mongod上看到1.1中提到的tailable cursor
了。
1.3 当我们执行cursor.next()
时,又发生了什么?
在前面的步骤生成了相应的cursor之后,接下来的操作理所应当就是对cursor的不断迭代了。官方文档里也是这样示例的:
代码语言:txt复制watchCursor = db.getSiblingDB("hr").watch()
while (!watchCursor.isExhausted()){ //isExhausted
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}
驱动层的封装做了什么呢?我们还是以mongo-driver
的go语言版本为例:
cursor.Next()
会首先尝试查看本地缓存的队列里有没有,如果有的话直接取一个文档返回,没有的话则需要通过BatchCusor.Next()
去server端拉一批(大小由cursor的batchSize
指定)再放到缓存队列里。BatchCursor.Next()
最终会调用BatchCusor.getMore()
,本质上是对getMore
命令的简单封装。看到熟悉的RoundTrip()
我们就不用往下看了,因为接下来的事情已经不是驱动的处理范围了。
func (bc *BatchCursor) getMore(ctx context.Context) {
bc.clearBatch()
conn, err := bc.server.Connection(ctx)
response, err := (&command.GetMore{
Clock: bc.clock,
ID: bc.id,
NS: bc.namespace,
Opts: bc.opts,
Session: bc.clientSession,
}).RoundTrip(ctx, bc.server.SelectedDescription(), conn)
...
}
注意到,驱动里会尝试去恢复有问题的change stream cursor,这也是specifications中所要求的。实现上就是通过第7步里的runCommand
方法,通过指定{replaceOption:true}
,将本地缓存的resumeToken
放在ResumeAfter
中进行options的替换,从而指定change stream的恢复而不是新建。
二、内核源码解读
既然是聚合命令,到达服务端之后,对于mongos,由scommandscluster_aggregate.cpp#runAggregate()
作为入口进行处理;对于mongod则是由dbcommandsrun_aggregate.cpp#runAggregate()
作为入口进行处理。类似的,后面的getMore
命令,到达服务端之后,对于mongos,由scommandscluster_getmore_cmd.cpp#run()
作为入口进行处理;对于mongod则是由dbcommandsgetmore_cmd.cpp#run()
作为入口进行处理。
2.1 整体流程(副本集)
2.1.1 聚合命令处理入口——runAggregate()
大致流程:
- 预解析聚合命令的入参request(这里只是简化版解析,比如发现有
$changeStream
stage之后并不会深入其内部,更详细解析在下一步骤); - 将request里的pipeline进行按stage的深度解析;
- 如果原始namespace为视图(view)的话,需要将建立view时指定的pipeline与本次聚合命令里的pipeline做合并;
optimizePipeline()
会对pipeline做优化,可能会对其中的阶段做一些顺序调整和整合,比如多个$skip
合并等,具体规则请参考Aggregation Pipeline Optimization;- 如果原始namespace为view的话,还需要检查其collation(可译为字节序,用于字符串的比较)配置是否与pipeline中所涉及的view的配置一致,不一致会报错;
- 准备一个查询执行器
planExecutor
插入到pipeline中;会对pipeline首位为$sample
、以及最早的$match/$sort
进行优化。比如一个最早的$match
可以被从pipeline中剔除并替换成索引扫描的查询执行器; - 调用
optimizaPipeline()
再次对pipeline优化,这是因为在上一步中添加了cursor stage之后还有其他优化空间; - 创建可用于返回给客户端的cursor,然后注册到全局的cursor管理器中。如果之前指定了
Tailable
和AwaitData
参数,则设置相应的cursor参数; - 如果聚合命令指定了explain参数,则返回整体的查询计划,否则将客户端cursor作为result的一部分返回,同时还会进行currentOp的相关修改;
鉴于本文主要讨论change stream相关流程,我们需要关注的是第二步中的pipeline解析。调用链:
runAggregate()-->Pipeline::parse()-->Pipeline::parseTopLevelOrFacetPipeline()-->DocumentSource::parse()-->DocumentSourceChangeStream::createFromBson()-->buildPipeline()
- 在
parse()
之前,如果预解析结果里pipeline有$changeStream
的话,会首先将ns改为oplog.rs
,代表其监听的集合就是oplog表;然后将readConcern
升级为majority
来确保返回的change event已经提交到副本集中大多数节点中。同时如果原始namespace为view的话会报错,因为change stream并不支持在view上创建; - 从
parse()
到createFromBson()
的调用是通过全局的parserMap
来实现的,所有支持的聚合管道stage都会将自己的解析函数存在这个全局map里。解析完$changeStream
stage后还会解析pipeline中的其他stage。
首先通过宏将预解析和解析(
createFromBson
)两个函数注册到全局的parserMap中去 REGISTER_MULTI_STAGE_ALIAS(changeStream, DocumentSourceChangeStream::LiteParsed::parse, DocumentSourceChangeStream::createFromBson);parseTopLevelOrFacetPipeline
中会对pipeline中的所有stage遍历调用各自的parse方法: SourceContainer stages;for (auto&& stageObj : rawPipeline) { auto parsedSources = DocumentSource::parse(expCtx, stageObj); stages.insert(stages.end(), parsedSources.begin(), parsedSources.end()); }然后在parse()
中会进行map的查找并调用它,完成后转到待解析stage列表中的下一个,继续其他stage的解析过程: map中查找举个例子,对于1.1章节中展示的cursor,就是先解析
$changeStream
再解析$match
,分别调用DocumentSourceChangeStream::createFromBson()
和DocumentSourceMatch::createFromBson()
【关于代码结构的备注】:看到这里可能你已经发现了,MongoDB中聚合命令所有支持的stage,都是继承自DocumentSource
抽象类,并且都实现了相应的createFromBson
方法用于解析stage内的操作符(如$and/$or
)。它们的类都定义在db/pipeline/
路径下。有对其他stage感兴趣的朋友可以从这里入手。
2.1.2 构建$changeStream
的pipeline
按照之前的描述来看下createFromBson
函数:
构建出来的pipeline包含了多个阶段,从前往后分别为oplogMatch
、transformation
、checkInvalidate
、resume
(可选)、closeCursor
、lookupChangePostImage
(可选):
oplogMatch阶段会复用已有的Match阶段(也就是做基本查询find({$match:...})
时会使用到的阶段)。对oplog.rs
表的查询匹配有以下规则:
1. 首先关注相关的DDL操作,如果是表维度的监听,那么对于该表的`drop/renameCollection/to`的操作都需要匹配到,因为这些操作会产生非法的change event。同理,如果是db维度的监听,则`dropDatabse`也需要被匹配到;
2. 按`namespace`匹配。如果是cluster维度的监听,则需要匹配所有非`admin/config/local`库内表的操作,否则匹配指定`namespace`的操作;
3. rename操作的`to`是目标`namespace`时;
4. 对于非DDL操作,匹配一般的CURD操作,通过`{"op":{$ne: "n"} }`来实现(因为DDL在前面的规则中已处理);
5. chunk 迁移到一个新的shard的操作`migrateChunkToNewShard`,它的`op`是`n`;
6. 匹配所有的namesapce相关的事务操作,以`applyOps`的形式呈现;
7. 过滤所有balancer产生的操作,通过`{"fromMigrate":{$ne:true}}`来实现,因为balancer产生的操作如`move chunk`只涉及数据的位置变化,数据本身并没有发生变化;如清理孤儿文档的操作虽然删除了数据,但是对整体数据的完整性并没有影响,因此也可以完全过滤掉。
以表维度的监听为例,其生成的pipeline放在了附录中,感兴趣的可以去看一下,会发现就是上述规则的并集。
- Transform阶段在pipeline构建时除了新建对象(
DocumentSourceTransform
)之外并没有做什么事情,对于指定了pipeline中包含resumeAfter
的情况,如果resumeToken
(结构体见2.2节)中的documentKey
字段包含了shard key,会将它缓存起来。 - CheckInvalidate阶段用于判断对于给定监听类型而言,是否产生了非法事件。在pipeline构建时除了新建对象(
DocumentSourceCheckInvalidate
)也不会做什么事情。 - resume阶段只在指定了
resumeAfter
时存在,根据是否需要对cursor进行合并处理(needsMerge
)会走两个不同的内部stage,如果需要,则会将resumeToken
的clusterTime
字段存下来用作后续的检查;否则会将整个resumeToken
存下来用于后续检查其存在性。 - CloseCursor阶段用于非法事件产生时关闭相关的cursor和销毁资源。在pipeline构建时除了新建对象(
DocumentSourceCloseCursor
)之外不会做什么事情。 - LookupChangePostImage阶段只在指定了
{fullDocument:updateLookup}
时存在,用于监听更新操作时返回更新的源文档。同样的,在pipeline构建时也只是新建了对象(DoucmentSourceLookupChangePostImage
)。
值得注意的是,上面描述的stage都是内部的概念,并不会对外暴露。用户完全不感知这些阶段,只是为了便于理解change stream工作的流程,不同的阶段相互解耦,完成各自相对独立的任务。
2.1.3 getMore命令处理入口——run()
runParsed()
大致流程如下:
- 根据curosr类型不同,获取不同的锁。然后根据请求里的cursorID从管理器中取出相应的cursor;
在这里,cursor会区分两种类型: 被特定的
表cursor管理器
所管理,比如通过find()
生成的,这种只需要加表级别的读锁即可; 被全局的cursor管理器
所管理,比如通过聚合命令生成的(如$changeStream
)
- 进行一些基本的检查,比如cursor的权限、namespace的权限、cursor的相关参数等(比如如果cursor指定了
awiaitData
选项则必须也指定tailable
选项等); - 如果cursor指定了
readConcern:majority
,则获取相应的已提交大多数的快照(snapshot); - 构建一个执行器
planExecutor
并执行,通过generateBatch()
得到一批符合条件的返回结果并放在nextBatch
中; - 为了生成该命令执行过程中的一些细节,比如检查了多少个文档,检查了多少个key等,需要做一些少量的计算以及指标更新,便于后续输出到日志中。也会有关于currentOp状态的更新;
- 最后通过
nextBatch.done()
将这一批结果放到命令的返回结果中;
对于我们关注的change stream流程而言,主要是第4步中的用于获取一批返回结果的generateBatch()
。调用链:
generateBatch()-->PlanExecutor::getNext()-->planExecutor::getNextImpl()-->planStage::work()-->PipelineProxyStage::doWork()-->PipelineProxyStage::getNextBson()-->Pipeline::getNext()
不妨来看一下这个getNext()
的实现,它会不断的去调用_sources
里的getNext()
,直到最后返回一个文档或者cursor迭代完成返回空:
而这里的_source
正是之前在buildPipeline()
时生成的若干个stage组成的列表。
2.1.4 pipeline内每个stage的处理
再来看一下跟change stream相关的细节。前面提到了$changeStream
内部分了若干个stage,因此这里我们可以很容易地将调用链补全:
那么在**getNext()
**被调用中,这些stage都各自发挥了什么作用呢?
- Transform阶段实施从
oplog条目
到change event
的转换。会提取出oplog中需要的字段(比如代表操作类型的op
,代表时间戳的ts
,代表namespace的ns
,uuid
),也会新增一些字段比如operationType/fullDocument/documentKey
等。对于不同的操作类型进行不同的处理,update会多一些操作。
Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
...
switch (opType) {
// "op"为i,即insert操作
case repl::OpTypeEnum::kInsert: {...}
// "op"为d,即delete操作
case repl::OpTypeEnum::kDelete: {...}
// "op"为u,即update操作
case repl::OpTypeEnum::kUpdate: {
// 会区分update和replace两种操作类型,对应替换和更新
// !注意到,这里就会对fullDocument赋值,但是只会赋值为oplog条目中'o'字段的内容,并没有去查询更新的源文档,这个操作在最后的lookupChangePostImage阶段才会完成!
}
// "op"为c,即其他DDL操作
case repl::OpTypeEnum::kCommand: {
//根据'o'中的子操作类型
// 1) o.applyops: 需要通过extractNextApplyOpsEntry()提取内部的文档,再applyTransformation()
// 2) o.renameCollection: 相应的rename事件
// 3) o.dropDatabase: 相应的dropDatabase事件
// 4) 其他的子操作类型都会导致非法事件
}
// "op"为n,即空操作
case repl::OpTypeEnum::kNoop: {
// 这里只有可能是NewShard类型,需要抛给上层处理并在新shard上新开cursor
// 同时构造一个虚假的documentKey,为了能恢复到此操作之后
}
default:{ MONGO_UNREACHABLE;}
}
// 生成resumeToken
ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey);
// 如果是分片集群,意味着结果需要merge,这里的排序规则为:ts uuid documentKey
if (pExpCtx->needsMerge) {
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
- CheckInvalidate阶段,会判断对于给定监听类型而言,是否产生了非法事件。比如对于特定表的监听,那么删除表/重命名表/删除库都是非法的。会产生非法事件并交由后续流程进行错误的返回以及cursor的关闭等。
bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
StringData operationType) {
if (pExpCtx->isSingleNamespaceAggregation()) {
return operationType == DSCS::kDropCollectionOpType ||
operationType == DSCS::kRenameCollectionOpType ||
operationType == DSCS::kDropDatabaseOpType;
} else if (!pExpCtx->isClusterAggregation()) {
return operationType == DSCS::kDropDatabaseOpType;
} else {
return false;
}
};
- CloseCursor阶段如果发现当前文档是非法事件,则会标记应该关闭cursor,在下一次
getNext()
时进行cursor的关闭。 - resume阶段分为两种情况,对于需要对cursor进行合并处理的情况(分片集群)而言,需要检查给定的
resumeToken
是否可以用来恢复。会首先查看resumeToken
的时间戳是否匹配,然后从oplog表中取出最早的一条记录对比时间戳,如果resumeToken
更小的话,说明期望恢复的时间点已经不在oplog中,即无法恢复了。对于非分片情况,只需要检查给定的resumeToken
是否存在即可。
函数会返回3个状态: 为什么需要检查而不能直接定位到?主要有以下两个原因: 1)用户指定的**
resumeToken
**不一定合法,一方面可能resumeToken
所对应的操作已经不在oplog的范围中。另一方面由于是入参,如果不是从驱动或者change event中提取出来的,其可能是任何值; 2)用户指定的**resumeToken
**是合成的,是change event中独有的,在oplog中并不存在相应的字段。因此我们并不能在buildPipeline
时直接对ResumeToken
做$match
匹配来过滤,而是退而求其次提取出ResumeToken
中的clusterTime
,以{"ts":{$gt: clusterTime}}
做过滤,要先在transform阶段利用oplog里的信息生成resumeToken
后再去匹配。 检查函数的返回值可能为以下几种: kFoundToken——找到了指定的resumeToken
对应的oplog; kCheckNextDoc ——当前的比指定的resumeToken
更老,因此需要继续找; kCannotResume——当前的已经比指定的resumeToken
更新,意味着不可能找到resumeToken了,直接返回不可恢复的错误。
- oplogMatch阶段会根据之前建立好的规则来匹配每一条oplog。
- LookupChangePostImage阶段只会对
operationType
为update
的情况进行处理,获取到需要进行查找的documentKey
并再次建立一个只有$match
阶段的pipeline并执行,得到源文档并填充在"fullDocument"
字段中。
所有的getNext()
方法除了返回自己阶段处理完毕后的文档之外还会返回一个状态,用来告知接下来的阶段。状态的取值可能为以下几种:
- kAdvanced——表示结果需要被处理,一切正常;
- kEOF——没有更多结果了;
- kPauseExecution——有问题,需要停止;
经过pipline里这一系列阶段的处理,最终就能得到我们需要的change event并返回给客户端了。至此,整个change stream的工作流程(自顶向下,从客户端-->驱动-->server端)就梳理完毕了。
2.2 resumeToken
resumeToken
用于唯一描述一个变更事件(change event),可用于change stream的故障恢复。
2.2.1 resumeToken
示例及结构
可以从任意一个change event中提取出与该文档操作对应的resumeToken`:
代码语言:txt复制{ "_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" }, "operationType" : "insert", "clusterTime" : Timestamp(1595239231, 2), "fullDocument" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83"), "a" : 2 }, "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83") } }
提取出得到下面的K-V对。_data
标识的那一长串十六进制就是了。
"_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" },
在内核源码中,它以ResumeToken
存储,以ResumeTokenData
对外呈现。
struct ResumeTokenData {
Timestamp clusterTime; //逻辑时间戳,int64,由unix时间戳 计数器组成
int version = 0; //版本号,指的是resumeToken自身的版本号,因为resumeToken经历了从3.6的BinData-->4.0.7前的十六进制编码字符串v0-->4.0.7以后的十六进制编码字符串v1 的演变过程
size_t applyOpsIndex = 0; //applyOps内部的index,仅用于副本集事务
Value documentKey; //文档key,由_id和shardKey(if have)组成
boost::optional<UUID> uuid; //生成的namespace的uuid
};
class ResumeToken {
private:
Value _keyStringData; //BinData 或者十六进制编码字符串
Value _typeBits; // 保存type
}
在呈现上,无论是以binData
还是十六进制编码字符串
的形式呈现,在v0版本是按照 clusterTime、documentKey、uuid
的顺序(v1则是将uuid
放在了documentKey
的前面),resumeToken
的呈现形式并不会影响resumeToken
的可排序性,因为排序是分字段排序的。正常情况下同一个文档的操作(CURD)会有相同的documentKey
,但是由于一定会在同一个shard上执行,由逻辑时间戳保证了其clusterTime
不一样,uuid
也会不一样,因此相应的resumeToken
肯定是不一样的。并且在排序时也是严格遵守“因果关系”。
applyOpsIndex
的存在主要是为了描述事务中的操作,为了能在恢复时准确定位到事务中的某个操作。当resumeToken
描述的是事务中的操作时,clusterTime
字段存储的是整个事务的提交时间,事务内的所有操作需要这个index来建立时间顺序(事实上,新版本中会将此字段更名为txnOpIndex
,更好理解一些)。当然,如果是resumeToken
描述的是非事务操作,这个字段则一直为0。
2.2.2 resumeToken
的可比性
由于resumeToken
与文档是一一对应的,而且其组成的字段中包含了逻辑时间戳clusterTime
,因此本身就是具有可比性的。对于事务而言,resumeToken
由于包含了文档的applyOpsIndex
(事务中单个原子操作的索引),对于相同clusterTime
的情况也是具有可比性的。mongos也正是利用这一点才能在收到多个分片的返回结果时可以直接利用resumeToken
来完成事件发生先后顺序的决定,不会出现change event乱序的问题。
从上面的截图里能看出其组成方式为:clusterTime|version|applyOpsIndex|uuid|documentKey
其中clusterTime
是必须的,其他字段可选。不同的change event中的resumeToken
长度并不是完全一致的,比如一个非法事件只就有clusterTime
,而dropDatabase事件则没有uuid
和documentKey
。resumeToken
的比较就是直接比较_keyStringData
字符串即可。resumeTokenData
的比较则是会分别比较clusterTime
、documentKey
和uuid
。
2.3 分片集群
分片集群相较于副本集多了mongos的分发以及最后结果的排序与聚合,Change Stream源码解读这篇文章里已经介绍地比较详细了,鉴于文章篇幅这里不再赘述,感兴趣的可以去看看。
结构与上面描述的副本集中pipeline多个stage类似,mongos特有以下两个stage:
- MergeCursors阶段:合并各个分片返回的结果并排序,排序的规则就是按照
resumeToken
的大小来的 - UpdateOnAddShard阶段:用于处理新增一个shard的情况。这里需要在新shard上也建立相应的change stream cursor,以确保change event的完整性。
值得提一下的是,如果是分片集群的话,change stream必须通过mongos来建立,那么前面描述中的CloseCursor
以及LookupChangePostImage
这两个阶段都会被放到mongos上来做,因为这两个阶段都只需要做一次就够了,没必要在每个分片上都做,算是一个小优化。
三、Q&A
- 为什么changeStream要做成可恢复的?
为了更好的用户体验,毕竟是常驻的cusor。而且在处理主从同步oplog拉取cursor的故障恢复问题时已有一定的经验,直接复制过来就好。 另外mongoDB还要求所有语言版本的驱动都加上对网络问题的自行恢复尝试。
- 为什么
$changeStream
要在聚合管道的第一位?
为了在恢复时可以添加或替换resumeToken。
$changeStream
内部的几个stage顺序有严格要求么?
有的。比如
resume stage
就只能放在checkInvalidate stage
的后面。因为如果用户希望从一个invalidate event
的resumeToken进行恢复的话,先检查是否可以恢复并返回报错才是正确的行为。
- 为什么
getNext()
的stage顺序和buildPipeline()
的顺序不一致?
被
optimizePipeline()
优化调整过。transformation
和closeCursor
阶段被提前。前者主要是因为其他后续阶段都是以change event作为入参的;后者会使得新产生的非法事件变成change event返回,在下一轮next()
才会真的销毁资源。(非法事件是需要返回的)
- 如果我
db.watch()
的时候不指定任何参数,changeStream的默认行为是?
maxAwaitTimeMS
缺省1000ms,表示服务端最多等待1s就返回给客户端(哪怕是一个空批次)。不指定起始时间时,会使用myLastAppliedOpTime
作为起始时间。
- 为什么指定
resumeAfter
时要使用{"ts":{$gte:xxx}
,而其他情况只需要{"ts":{$gt:xxx}}
?
主要是因为事务操作的关系。
$gt
和$gte
的唯一区别就是边界的包含问题。前面提到过,当指定resumeAfter
时传入的是resumeToken
,会被转换为对clusterTime
的比较。当resumeToken
对应的是事务中的某个操作时,由于事务中所有操作都具有相同的clusterTime
,如果使用$gt
的话可能会漏掉部分操作导致无法恢复的结果。对于其他情况,指定startAtOperationTime
就是从某个时间点后,符合参数语义没有问题;什么也不指定,使用myLastAppliedOpTime
作为起始时间也没有问题。
- change Stream是否支持调整Concern?
不支持,返回的change event一定是在大多数节点上已提交的文档。
- 为什么mongos上建立的监听流要将用户自定义的管道操作符放在mongos上执行,不能下放到mongod上以获得一定的优化吗?
很遗憾,在当前的架构下是只放在mongos上执行的。确实存在一定的优化空间,但需仔细考虑可以下放到mongod的操作符类型以及具体内容。 不妨举个例子来说明。假设用户自定义的操作符为:
{$match:{operationType:"insert"}}
,如果我们将这个阶段下放到mongod,那么所有分片上产生的invalidate事件都会被过滤掉,导致即使发生了非法事件,变更流也将永远不会失效,这是无法接受的。 事实上,如果用户自定义操作符为:{$project:"updateDescription"}
(表示用户只关注更新操作到底更新了什么),那么我们将$project
下放到mongod可以减少mongos和mongod之间的网络流量。当然了resumeToken
也需要被传递来确保所有事件的可排序性。
- 什么情况下使用
resumeToken
也无法恢复change stream?
确实存在无法恢复的情况,主要为以下几种: 1)期望恢复的
resumeToken
所对应的oplog条目已经不在oplog.rs
表中。当中断的时间比较长时会出现这种情况。 2)使用的是非法事件(invalidate event
)中的resumeToken
。不过官方在4.2版本里对这里做了优化,提供了新的startAfter
选项,直接传入非法事件的resumeToken
,可以恢复到非法事件产生后的时间点。 3)resumeToken
格式不合法(只要使用的是驱动或者change event中的resumeToken一般不会遇到此问题)
- 拉取oplog阶段是否会拉取全量的oplog?
并不是。通过对比主从同步与change stream的cursor可以发现:主从同步只设置了时间戳过滤条件,可以认为是全量拉取,而change stream的cursor的过滤条件更为丰富(参考附录2中matchFilter示例)。 对于库表维度的监听,只会拉取部分跟指定namespace相关的操作。而如果是整集群维度的监听,则会退化为拉取除了少量未处理DDL操作外的大多数oplog。
四、总结
- Change Stream本质上是聚合命令中的特殊阶段:
$changestream
,它由一系列内部子阶段组成。 - Change Stream的总体流程为:拉取oplog-->转换-->检查-->匹配-->返回事件,而且是完全串行的。从相关的JIRA来看,官方并不打算通过在stage内部并行化的方式来优化这里的性能,而是会考虑复用stage。
- 整体上,change Stream的实现较为完整,尤其是可恢复性方面,并且官方也在性能方面做了一些优化。
- 故障恢复对于遭遇了非法事件的情况不是特别友好,因为没办法通过
resumeAfter
来进行恢复。遇到非法事件导致的cursor挂掉的情况只能手动查询挂掉的时间戳后再以startAtOpeartionTime
重新启动change stream。不过官方在4.2版本里对这里做了优化,提供了新的startAfter
选项,直接传入非法事件的resumeToken
,可以恢复到非法事件产生后的时间点。 - 对于分片集群的情况,mongos上建立的监听流会将所有用户自定义的管道操作符(如
$match/$project
等)放在mongos上而不是mongod上执行,可能会导致mongos成为change stream的性能瓶颈。这里官方表示未来会优化。 - 应尽量避免在同一个集群上建立很多个整集群纬度的监听。
- 不支持一般的explain查询分析,需要使用聚合命令的查询分析方式,比如
db.xxx.explain().aggregate([{$changeStream: {}}, {$match: {operationType: "insert"}}])
参考
- mongodb source code
- mongodb doc
- mongodb specifications
- Push down user-defined stages in a change stream pipeline where possible
- MongoDB Change Stream之一——上手及初体验
附录
1.主从同步的tailable cursor示例
代码语言:txt复制因长度原因,同样只截取部分字段。
{
"host": "TENCENT64.site:7006",
"desc": "conn47362",
"connectionId": 47362,
..."op": "getmore",
"ns": "local.oplog.rs",
"command": {
"getMore": NumberLong("37596103597"),
"collection": "oplog.rs",
"batchSize": 13981010,
"maxTimeMS": NumberLong(5000),
"term": NumberLong(2),
"lastKnownCommittedOpTime": {
"ts": Timestamp(1606124080,1),
"t": NumberLong(2)
},
"$replData": 1,
"$oplogQueryData": 1,
"$readPreference": {
"mode": "secondaryPreferred"
},
"$clusterTime": {
"clusterTime": Timestamp(1606124083,1),
"signature": {
"hash": BinData(0,"JYtEpjcKP1mUp16iAR5Ti8/7t4M="),
"keyId": NumberLong("6846941188591714334")
}
},
"$db": "local"
},
"originatingCommand": {
"find": "oplog.rs",
"filter": {
"ts": {"$gte": Timestamp(1594362143,1)}
},
"tailable": true,
"oplogReplay": true,
"awaitData": true,
"maxTimeMS": NumberLong(60000),
"batchSize": 13981010,
"term": NumberLong(2),
"readConcern": {
"afterClusterTime": Timestamp(1594362143,1)
},
"$replData": 1,
"$oplogQueryData": 1,
"$readPreference": {
"mode": "secondaryPreferred"
},
"$db": "local"
},
"planSummary": "COLLSCAN",
"numYields": 2,
"waitingForLock": false,
}
2. oplogMatch阶段的matchFilter示例
一个表维度的监听生成的matchFilter
示例:
{
$and: [
//resumeAfter和startAtOperationTime中的时间戳会在这里体现,由于我没有指定resumeAfter因此是$gt而非$gte
{ts: {$gt: Timestamp(1608194033,1)}},
{
$or: [
{
ns: /^db1.t$/, //库表的匹配是通过正则的方式
$or: [
{op: {$ne: "n"}}, //非空操作
{op: "n",o2.type: "migrateChunkToNewShard"} //Addshard的特殊空操作
]
},
{
op: "c", //关注所在db内的一些可能会导致非法事件的DDL,不同纬度的watch略有区别
$or: [
{
$and: [
{ns: "db1.$cmd"},
{
$or: [
{o.drop: "t"},
{o.renameCollection: "db1.t"},
{o.to: "db1.t"},
// 检查基于该表的create操作是否指定了非预期的collation
{o.create: "t",o.collation: {$exists: true}
}
]
}
]
},
{o.to: /^db1.t$/} // rename到这个表的操作,同样是正则匹配
]
},
{
op: "c", //关注事务相关的DDL操作,以applyOps的形式呈现
lsid: {$exists: true},
txnNumber: {$exists: true},
o.applyOps.ns: /^db1.t$/
}
]
},
{fromMigrate: {$ne: true}} // 非balancer产生的操作,比如moveChunk,cleanupOrphan等等
]
}