MongoDB Change Stream之二——自顶向下流程剖析

2021-07-26 12:23:03 浏览数 (1)

上一篇文章介绍了『change stream的上手及初体验』,相信大家都对之已经有了基本的了解。接下来,本篇文章尝试深入MongoDB的内核源码,来看看其内部原理以及实现细节。为了帮助大家更好地切入,采用了自顶向下(也就是从客户端-->驱动-->服务端)的方式来梳理整个流程。希望能对大家更深入了解MongoDB change stream功能有一定帮助。(未特殊说明,文中内容均基于MongoDB4.0版本代码)

一、原理&自顶向下流程

自顶向下流程的整体时序图如下:

change stream时序图.pngchange stream时序图.png

事实上,所有的query基本也是这样一个流程,只是不同的命令会获得不同类型的cursor罢了。这里如果暂时不好理解的话,不妨把第一章内容浏览完再回过头来看看。

1.1 当我们使用change stream,得到了什么?

一个类似复制集协议中主从同步逻辑的,挂在节点**local.oplog.rs**表上的tailable cursor。

Change Stream本质上是聚合命令中的一个特殊管道阶段(pipeline stage),由于它需要常驻在集群的节点上,因此会以tailable cursor的形式出现。

什么是**tailable cursor**?

按照官方的定义,它在概念上类似于Unix操作系统中提供的tail -f命令,即当一个游标到达结果集的末尾之后,它也不会立即关闭,而是将继续等待新的数据产生,并在等到的时候将之返回。

注1:在change Stream功能出现以前,开发者想要实时感知MongoDB数据库的变化只能通过tailing oplog的方式,其实也是使用的tailable cursor注2:副本集内的主从同步也是用的这个,细节请参考之前的文章。

tailable cursor不会使用到索引,因此建立tailable cursor时的初始化扫描比较耗时。事实上,对于oplog这个capped collection而言,如果某个时刻有较多的oplog产生,tailable cursor也会因为需要逐条扫描而导致性能不佳。

要想确认change Stream的本质也很简单,在已经进行过变更流监听的节点上执行db.currentOp(),然后就可以看到类似下面这样的一个command(仅截取部分字段,关键字段已注释):

代码语言:txt复制
{
    "host" : "TENCENT64.site:7029",
    ...
    "desc" : "conn14235866",
    "connectionId" : 14235866,	//该操作所在的链接id
    "op" : "getmore",	// 操作类型
    "ns" : "admin.$cmd",	// 指定的namespace,这里是集群维度的watch,因此为'admin.$cmd';如果是db维度的watch,则为'db.$cmd';collection维度的watch则为'db.collection'
    "command" : {	// 描述命令的基本信息
	"getMore" : NumberLong("6717122891978962123"),
	"collection" : "$cmd.aggregate", // 只有collection维度的watch才会显示表名
	"$clusterTime" : {	//逻辑时间戳
	    "clusterTime" : Timestamp(1605777729, 2),
	    "signature" : {
		"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
		"keyId" : NumberLong(0)
	    }
	},
	"$db" : "admin",	// 操作的db信息,这里是集群维度的watch,因此为'admin'
	"$readPreference" : {"mode" : "primaryPreferred"}
    },
    "originatingCommand" : {	//描述原始命令的详细信息
	"aggregate" : 1, // 标识是一个聚合命令,collection维度的watch会展示所监听的表名
	"pipeline" : [	// 该聚合命令的详细pipeline组成
	    { //'$changeStream'一定在pipeline首位
		"$changeStream" : {// change stream相关的参数
		    "allChangesForCluster" : true, // 标识是对整个集群的变更进行watch
		    "fullDocument" : "default" // 未指定update返回全文档,使用缺省值;需要的话可以指定为'updateLookup'
		}
	    },
	    {	//其他自定义的aggregate操作符比如'$match'放在后面,需要注意的是change stream所支持的pipeline stages只是MongoDB所支持的子集,比如'$addFields/$replaceWith/$set'等
		"$match" : {"$or" : [{"operationType" : "drop"}]}
	    }
	],
	"lsid" : {"id" : BinData(4,"5zBaeTrSR0qmVnKbEh3JOA==")},
	"$db" : "admin", //同上
    },
    "planSummary" : "COLLSCAN",	// 查询计划,由于cursor本质上是挂在oplog表的,没有索引
    "numYields" : 125,
    "waitingForLock" : false,
}

感兴趣的朋友还可以将这里的内容与附录中主从同步的tailable cursor做比较,会惊喜地发现以下异同

  • 都是getMore类型的op,因为都是cursor;
  • 都是全表扫描COLLSCAN,因为都是挂在oplog表上,而oplog表没有索引;
  • 主从同步的namespace固定为local.oplog.rs,Change Stream则是特定的namespace,在上面的示例中由于是对整个集群的watch,因此是admin.$cmd
  • 主从同步的原始命令是一个基本的query操作且指定了包括tailable/oplogReplay/awaitData/maxTimeMS等在内的一系列cursor参数,Change Stream则是一个aggregate操作,并且其pipeline中的第一个stage一定是$changeStream,其他stage还包含用户自定义的条件;

1.2 当我们执行db.watch()时,发生了什么?

当我们在某个mongos/mongod(以mongo shell或者driver的形式)上执行db.watch()时,首先是驱动层会发送一个aggregate命令给相应的服务端(mongos/mongod)。这里为了让change Stream功能更加简单易用以及具备更好的灵活性,所有的驱动都要求做了一层封装并对外提供3个辅助方法(分别对应监听的三个维度)。在mongoDB官方的specifications我们可以看到其中的细节。以mongo-driver的go语言版本为例:

go driver.pnggo driver.png

然后接下来就会真正执行这个命令(调用RoundTrip()),并将从服务端得到的batch cursor封装一层返回。一切顺利的话,就能在mongos/mongod上看到1.1中提到的tailable cursor了。

1.3 当我们执行cursor.next()时,又发生了什么?

在前面的步骤生成了相应的cursor之后,接下来的操作理所应当就是对cursor的不断迭代了。官方文档里也是这样示例的:

代码语言:txt复制
watchCursor = db.getSiblingDB("hr").watch()

while (!watchCursor.isExhausted()){ //isExhausted
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

驱动层的封装做了什么呢?我们还是以mongo-driver的go语言版本为例:

go driver 1.pnggo driver 1.png

cursor.Next()会首先尝试查看本地缓存的队列里有没有,如果有的话直接取一个文档返回,没有的话则需要通过BatchCusor.Next()去server端拉一批(大小由cursor的batchSize指定)再放到缓存队列里。BatchCursor.Next()最终会调用BatchCusor.getMore(),本质上是对getMore命令的简单封装。看到熟悉的RoundTrip()我们就不用往下看了,因为接下来的事情已经不是驱动的处理范围了。

代码语言:txt复制
func (bc *BatchCursor) getMore(ctx context.Context) {
	bc.clearBatch()

	conn, err := bc.server.Connection(ctx)

	response, err := (&command.GetMore{
		Clock:   bc.clock,
		ID:      bc.id,
		NS:      bc.namespace,
		Opts:    bc.opts,
		Session: bc.clientSession,
	}).RoundTrip(ctx, bc.server.SelectedDescription(), conn)
	...
}

注意到,驱动里会尝试去恢复有问题的change stream cursor,这也是specifications中所要求的。实现上就是通过第7步里的runCommand方法,通过指定{replaceOption:true},将本地缓存的resumeToken放在ResumeAfter中进行options的替换,从而指定change stream的恢复而不是新建。

二、内核源码解读

既然是聚合命令,到达服务端之后,对于mongos,由scommandscluster_aggregate.cpp#runAggregate()作为入口进行处理;对于mongod则是由dbcommandsrun_aggregate.cpp#runAggregate()作为入口进行处理。类似的,后面的getMore命令,到达服务端之后,对于mongos,由scommandscluster_getmore_cmd.cpp#run()作为入口进行处理;对于mongod则是由dbcommandsgetmore_cmd.cpp#run()作为入口进行处理。

2.1 整体流程(副本集)

2.1.1 聚合命令处理入口——runAggregate()

大致流程:

runAggregate.pngrunAggregate.png
  1. 预解析聚合命令的入参request(这里只是简化版解析,比如发现有$changeStream stage之后并不会深入其内部,更详细解析在下一步骤);
  2. 将request里的pipeline进行按stage的深度解析;
  3. 如果原始namespace为视图(view)的话,需要将建立view时指定的pipeline与本次聚合命令里的pipeline做合并;
  4. optimizePipeline()会对pipeline做优化,可能会对其中的阶段做一些顺序调整和整合,比如多个$skip合并等,具体规则请参考Aggregation Pipeline Optimization;
  5. 如果原始namespace为view的话,还需要检查其collation(可译为字节序,用于字符串的比较)配置是否与pipeline中所涉及的view的配置一致,不一致会报错;
  6. 准备一个查询执行器planExecutor插入到pipeline中;会对pipeline首位为$sample、以及最早的$match/$sort进行优化。比如一个最早的$match可以被从pipeline中剔除并替换成索引扫描的查询执行器;
  7. 调用optimizaPipeline()再次对pipeline优化,这是因为在上一步中添加了cursor stage之后还有其他优化空间;
  8. 创建可用于返回给客户端的cursor,然后注册到全局的cursor管理器中。如果之前指定了TailableAwaitData参数,则设置相应的cursor参数;
  9. 如果聚合命令指定了explain参数,则返回整体的查询计划,否则将客户端cursor作为result的一部分返回,同时还会进行currentOp的相关修改;

鉴于本文主要讨论change stream相关流程,我们需要关注的是第二步中的pipeline解析。调用链:

runAggregate()-->Pipeline::parse()-->Pipeline::parseTopLevelOrFacetPipeline()-->DocumentSource::parse()-->DocumentSourceChangeStream::createFromBson()-->buildPipeline()

  • parse()之前,如果预解析结果里pipeline有$changeStream的话,会首先将ns改为oplog.rs代表其监听的集合就是oplog表;然后将readConcern升级为majority来确保返回的change event已经提交到副本集中大多数节点中。同时如果原始namespace为view的话会报错,因为change stream并不支持在view上创建;
  • parse()createFromBson()的调用是通过全局的parserMap来实现的,所有支持的聚合管道stage都会将自己的解析函数存在这个全局map里。解析完$changeStreamstage后还会解析pipeline中的其他stage。

首先通过宏将预解析和解析(createFromBson)两个函数注册到全局的parserMap中去 REGISTER_MULTI_STAGE_ALIAS(changeStream, DocumentSourceChangeStream::LiteParsed::parse, DocumentSourceChangeStream::createFromBson); parseTopLevelOrFacetPipeline中会对pipeline中的所有stage遍历调用各自的parse方法: SourceContainer stages;for (auto&& stageObj : rawPipeline) { auto parsedSources = DocumentSource::parse(expCtx, stageObj); stages.insert(stages.end(), parsedSources.begin(), parsedSources.end()); }然后在parse()中会进行map的查找并调用它,完成后转到待解析stage列表中的下一个,继续其他stage的解析过程: map中查找map中查找

举个例子,对于1.1章节中展示的cursor,就是先解析$changeStream再解析$match,分别调用DocumentSourceChangeStream::createFromBson()DocumentSourceMatch::createFromBson() 【关于代码结构的备注】:看到这里可能你已经发现了,MongoDB中聚合命令所有支持的stage,都是继承自DocumentSource抽象类,并且都实现了相应的createFromBson方法用于解析stage内的操作符(如$and/$or)。它们的类都定义在db/pipeline/路径下。有对其他stage感兴趣的朋友可以从这里入手。

2.1.2 构建$changeStream的pipeline

按照之前的描述来看下createFromBson函数:

createFromBson函数createFromBson函数

构建出来的pipeline包含了多个阶段,从前往后分别为oplogMatchtransformationcheckInvalidateresume(可选)、closeCursorlookupChangePostImage(可选):

changeStream源码-buildPipeline.pngchangeStream源码-buildPipeline.png

oplogMatch阶段会复用已有的Match阶段(也就是做基本查询find({$match:...})时会使用到的阶段)。对oplog.rs表的查询匹配有以下规则:

代码语言:txt复制
1. 首先关注相关的DDL操作,如果是表维度的监听,那么对于该表的`drop/renameCollection/to`的操作都需要匹配到,因为这些操作会产生非法的change event。同理,如果是db维度的监听,则`dropDatabse`也需要被匹配到;
2. 按`namespace`匹配。如果是cluster维度的监听,则需要匹配所有非`admin/config/local`库内表的操作,否则匹配指定`namespace`的操作;
3. rename操作的`to`是目标`namespace`时;
4. 对于非DDL操作,匹配一般的CURD操作,通过`{"op":{$ne: "n"} }`来实现(因为DDL在前面的规则中已处理);
5. chunk 迁移到一个新的shard的操作`migrateChunkToNewShard`,它的`op`是`n`;
6. 匹配所有的namesapce相关的事务操作,以`applyOps`的形式呈现;
7. 过滤所有balancer产生的操作,通过`{"fromMigrate":{$ne:true}}`来实现,因为balancer产生的操作如`move chunk`只涉及数据的位置变化,数据本身并没有发生变化;如清理孤儿文档的操作虽然删除了数据,但是对整体数据的完整性并没有影响,因此也可以完全过滤掉。

以表维度的监听为例,其生成的pipeline放在了附录中,感兴趣的可以去看一下,会发现就是上述规则的并集。

  • Transform阶段在pipeline构建时除了新建对象(DocumentSourceTransform)之外并没有做什么事情,对于指定了pipeline中包含resumeAfter的情况,如果resumeToken(结构体见2.2节)中的documentKey字段包含了shard key,会将它缓存起来。
  • CheckInvalidate阶段用于判断对于给定监听类型而言,是否产生了非法事件。在pipeline构建时除了新建对象(DocumentSourceCheckInvalidate)也不会做什么事情。
  • resume阶段只在指定了resumeAfter时存在,根据是否需要对cursor进行合并处理(needsMerge)会走两个不同的内部stage,如果需要,则会将resumeTokenclusterTime字段存下来用作后续的检查;否则会将整个resumeToken存下来用于后续检查其存在性。
  • CloseCursor阶段用于非法事件产生时关闭相关的cursor和销毁资源。在pipeline构建时除了新建对象(DocumentSourceCloseCursor)之外不会做什么事情。
  • LookupChangePostImage阶段只在指定了{fullDocument:updateLookup}时存在,用于监听更新操作时返回更新的源文档。同样的,在pipeline构建时也只是新建了对象(DoucmentSourceLookupChangePostImage)。

值得注意的是,上面描述的stage都是内部的概念,并不会对外暴露。用户完全不感知这些阶段,只是为了便于理解change stream工作的流程,不同的阶段相互解耦,完成各自相对独立的任务。

2.1.3 getMore命令处理入口——run()

runParsed()大致流程如下:

runParsed.pngrunParsed.png
  1. 根据curosr类型不同,获取不同的锁。然后根据请求里的cursorID从管理器中取出相应的cursor;

在这里,cursor会区分两种类型: 被特定的表cursor管理器所管理,比如通过find()生成的,这种只需要加表级别的读锁即可; 被全局的cursor管理器所管理,比如通过聚合命令生成的(如$changeStream)

  1. 进行一些基本的检查,比如cursor的权限、namespace的权限、cursor的相关参数等(比如如果cursor指定了awiaitData选项则必须也指定tailable选项等);
  2. 如果cursor指定了readConcern:majority,则获取相应的已提交大多数的快照(snapshot);
  3. 构建一个执行器planExecutor并执行,通过generateBatch()得到一批符合条件的返回结果并放在nextBatch中;
  4. 为了生成该命令执行过程中的一些细节,比如检查了多少个文档,检查了多少个key等,需要做一些少量的计算以及指标更新,便于后续输出到日志中。也会有关于currentOp状态的更新;
  5. 最后通过nextBatch.done()将这一批结果放到命令的返回结果中;

对于我们关注的change stream流程而言,主要是第4步中的用于获取一批返回结果的generateBatch()。调用链:

generateBatch()-->PlanExecutor::getNext()-->planExecutor::getNextImpl()-->planStage::work()-->PipelineProxyStage::doWork()-->PipelineProxyStage::getNextBson()-->Pipeline::getNext()

不妨来看一下这个getNext()的实现,它会不断的去调用_sources里的getNext(),直到最后返回一个文档或者cursor迭代完成返回空:

getNext实现getNext实现

而这里的_source正是之前在buildPipeline()时生成的若干个stage组成的列表。

2.1.4 pipeline内每个stage的处理

再来看一下跟change stream相关的细节。前面提到了$changeStream内部分了若干个stage,因此这里我们可以很容易地将调用链补全:

changeStream源码-getNext.pngchangeStream源码-getNext.png

那么在**getNext()**被调用中,这些stage都各自发挥了什么作用呢?

  • Transform阶段实施从oplog条目change event的转换。会提取出oplog中需要的字段(比如代表操作类型的op,代表时间戳的ts,代表namespace的nsuuid),也会新增一些字段比如operationType/fullDocument/documentKey等。对于不同的操作类型进行不同的处理,update会多一些操作。
代码语言:txt复制
Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
	...
	switch (opType) {
        // "op"为i,即insert操作
        case repl::OpTypeEnum::kInsert: {...}
        // "op"为d,即delete操作
        case repl::OpTypeEnum::kDelete: {...}
        // "op"为u,即update操作
        case repl::OpTypeEnum::kUpdate: {
            // 会区分update和replace两种操作类型,对应替换和更新
            // !注意到,这里就会对fullDocument赋值,但是只会赋值为oplog条目中'o'字段的内容,并没有去查询更新的源文档,这个操作在最后的lookupChangePostImage阶段才会完成!
        }
        // "op"为c,即其他DDL操作
        case repl::OpTypeEnum::kCommand: {
            //根据'o'中的子操作类型
            // 1) o.applyops: 需要通过extractNextApplyOpsEntry()提取内部的文档,再applyTransformation()
            // 2) o.renameCollection: 相应的rename事件
            // 3) o.dropDatabase: 相应的dropDatabase事件
            // 4) 其他的子操作类型都会导致非法事件
        }
            // "op"为n,即空操作
        case repl::OpTypeEnum::kNoop: {
            // 这里只有可能是NewShard类型,需要抛给上层处理并在新shard上新开cursor
            // 同时构造一个虚假的documentKey,为了能恢复到此操作之后
        }
        default:{ MONGO_UNREACHABLE;}
   }
   
   // 生成resumeToken
   ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey);
   // 如果是分片集群,意味着结果需要merge,这里的排序规则为:ts uuid documentKey
   if (pExpCtx->needsMerge) {
        doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
  • CheckInvalidate阶段,会判断对于给定监听类型而言,是否产生了非法事件。比如对于特定表的监听,那么删除表/重命名表/删除库都是非法的。会产生非法事件并交由后续流程进行错误的返回以及cursor的关闭等。
代码语言:txt复制
bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
                           StringData operationType) {
    if (pExpCtx->isSingleNamespaceAggregation()) {
        return operationType == DSCS::kDropCollectionOpType ||
            operationType == DSCS::kRenameCollectionOpType ||
            operationType == DSCS::kDropDatabaseOpType;
    } else if (!pExpCtx->isClusterAggregation()) {
        return operationType == DSCS::kDropDatabaseOpType;
    } else {
        return false;
    }
};
  • CloseCursor阶段如果发现当前文档是非法事件,则会标记应该关闭cursor,在下一次getNext()时进行cursor的关闭。
  • resume阶段分为两种情况,对于需要对cursor进行合并处理的情况(分片集群)而言,需要检查给定的resumeToken是否可以用来恢复。会首先查看resumeToken的时间戳是否匹配,然后从oplog表中取出最早的一条记录对比时间戳,如果resumeToken更小的话,说明期望恢复的时间点已经不在oplog中,即无法恢复了。对于非分片情况,只需要检查给定的resumeToken是否存在即可。

函数会返回3个状态: 为什么需要检查而不能直接定位到?主要有以下两个原因: 1)用户指定的**resumeToken**不一定合法,一方面可能resumeToken所对应的操作已经不在oplog的范围中。另一方面由于是入参,如果不是从驱动或者change event中提取出来的,其可能是任何值; 2)用户指定的**resumeToken**是合成的,是change event中独有的,在oplog中并不存在相应的字段。因此我们并不能在buildPipeline时直接对ResumeToken$match匹配来过滤,而是退而求其次提取出ResumeToken中的clusterTime,以{"ts":{$gt: clusterTime}}做过滤,要先在transform阶段利用oplog里的信息生成resumeToken后再去匹配。 检查函数的返回值可能为以下几种: kFoundToken——找到了指定的resumeToken对应的oplog; kCheckNextDoc ——当前的比指定的resumeToken更老,因此需要继续找; kCannotResume——当前的已经比指定的resumeToken更新,意味着不可能找到resumeToken了,直接返回不可恢复的错误。

  • oplogMatch阶段会根据之前建立好的规则来匹配每一条oplog。
  • LookupChangePostImage阶段只会对operationTypeupdate的情况进行处理,获取到需要进行查找的documentKey并再次建立一个只有$match阶段的pipeline并执行,得到源文档并填充在"fullDocument"字段中。

所有的getNext()方法除了返回自己阶段处理完毕后的文档之外还会返回一个状态,用来告知接下来的阶段。状态的取值可能为以下几种:

  • kAdvanced——表示结果需要被处理,一切正常;
  • kEOF——没有更多结果了;
  • kPauseExecution——有问题,需要停止;

经过pipline里这一系列阶段的处理,最终就能得到我们需要的change event并返回给客户端了。至此,整个change stream的工作流程(自顶向下,从客户端-->驱动-->server端)就梳理完毕了。

2.2 resumeToken

resumeToken用于唯一描述一个变更事件(change event),可用于change stream的故障恢复。

2.2.1 resumeToken示例及结构

可以从任意一个change event中提取出与该文档操作对应的resumeToken`:

代码语言:txt复制
{ "_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" }, "operationType" : "insert", "clusterTime" : Timestamp(1595239231, 2), "fullDocument" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83"), "a" : 2 }, "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83") } }

提取出得到下面的K-V对。_data标识的那一长串十六进制就是了。

"_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" },

在内核源码中,它以ResumeToken存储,以ResumeTokenData对外呈现。

代码语言:txt复制
struct ResumeTokenData {
    Timestamp clusterTime;	//逻辑时间戳,int64,由unix时间戳 计数器组成
    int version = 0;	//版本号,指的是resumeToken自身的版本号,因为resumeToken经历了从3.6的BinData-->4.0.7前的十六进制编码字符串v0-->4.0.7以后的十六进制编码字符串v1 的演变过程
    size_t applyOpsIndex = 0;	//applyOps内部的index,仅用于副本集事务
    Value documentKey;	//文档key,由_id和shardKey(if have)组成
    boost::optional<UUID> uuid;	//生成的namespace的uuid
};

class ResumeToken {
private:
    Value _keyStringData; //BinData 或者十六进制编码字符串
    Value _typeBits; // 保存type
}

在呈现上,无论是以binData还是十六进制编码字符串的形式呈现,在v0版本是按照 clusterTime、documentKey、uuid的顺序(v1则是将uuid放在了documentKey的前面),resumeToken的呈现形式并不会影响resumeToken的可排序性,因为排序是分字段排序的。正常情况下同一个文档的操作(CURD)会有相同的documentKey,但是由于一定会在同一个shard上执行,由逻辑时间戳保证了其clusterTime不一样,uuid也会不一样,因此相应的resumeToken肯定是不一样的。并且在排序时也是严格遵守“因果关系”。

applyOpsIndex的存在主要是为了描述事务中的操作,为了能在恢复时准确定位到事务中的某个操作。当resumeToken描述的是事务中的操作时,clusterTime字段存储的是整个事务的提交时间,事务内的所有操作需要这个index来建立时间顺序(事实上,新版本中会将此字段更名为txnOpIndex,更好理解一些)。当然,如果是resumeToken描述的是非事务操作,这个字段则一直为0。

2.2.2 resumeToken的可比性

由于resumeToken与文档是一一对应的,而且其组成的字段中包含了逻辑时间戳clusterTime,因此本身就是具有可比性的。对于事务而言,resumeToken由于包含了文档的applyOpsIndex(事务中单个原子操作的索引),对于相同clusterTime的情况也是具有可比性的。mongos也正是利用这一点才能在收到多个分片的返回结果时可以直接利用resumeToken来完成事件发生先后顺序的决定,不会出现change event乱序的问题。

image-20201217120416328.pngimage-20201217120416328.png

从上面的截图里能看出其组成方式为:clusterTime|version|applyOpsIndex|uuid|documentKey其中clusterTime是必须的,其他字段可选。不同的change event中的resumeToken长度并不是完全一致的,比如一个非法事件只就有clusterTime,而dropDatabase事件则没有uuiddocumentKeyresumeToken的比较就是直接比较_keyStringData字符串即可。resumeTokenData的比较则是会分别比较clusterTimedocumentKeyuuid

2.3 分片集群

分片集群相较于副本集多了mongos的分发以及最后结果的排序与聚合,Change Stream源码解读这篇文章里已经介绍地比较详细了,鉴于文章篇幅这里不再赘述,感兴趣的可以去看看。

结构与上面描述的副本集中pipeline多个stage类似,mongos特有以下两个stage:

  • MergeCursors阶段:合并各个分片返回的结果并排序,排序的规则就是按照resumeToken的大小来的
  • UpdateOnAddShard阶段:用于处理新增一个shard的情况。这里需要在新shard上也建立相应的change stream cursor,以确保change event的完整性。

值得提一下的是,如果是分片集群的话,change stream必须通过mongos来建立,那么前面描述中的CloseCursor以及LookupChangePostImage这两个阶段都会被放到mongos上来做,因为这两个阶段都只需要做一次就够了,没必要在每个分片上都做,算是一个小优化。

三、Q&A

  • 为什么changeStream要做成可恢复的?

为了更好的用户体验,毕竟是常驻的cusor。而且在处理主从同步oplog拉取cursor的故障恢复问题时已有一定的经验,直接复制过来就好。 另外mongoDB还要求所有语言版本的驱动都加上对网络问题的自行恢复尝试。

  • 为什么$changeStream要在聚合管道的第一位?

为了在恢复时可以添加或替换resumeToken。

  • $changeStream内部的几个stage顺序有严格要求么?

有的。比如resume stage就只能放在checkInvalidate stage的后面。因为如果用户希望从一个invalidate event的resumeToken进行恢复的话,先检查是否可以恢复并返回报错才是正确的行为。

  • 为什么getNext()的stage顺序和buildPipeline()的顺序不一致?

optimizePipeline()优化调整过。transformationcloseCursor阶段被提前。前者主要是因为其他后续阶段都是以change event作为入参的;后者会使得新产生的非法事件变成change event返回,在下一轮next()才会真的销毁资源。(非法事件是需要返回的)

  • 如果我db.watch()的时候不指定任何参数,changeStream的默认行为是?

maxAwaitTimeMS缺省1000ms,表示服务端最多等待1s就返回给客户端(哪怕是一个空批次)。不指定起始时间时,会使用myLastAppliedOpTime作为起始时间。

  • 为什么指定resumeAfter时要使用{"ts":{$gte:xxx},而其他情况只需要{"ts":{$gt:xxx}}?

主要是因为事务操作的关系$gt$gte的唯一区别就是边界的包含问题。前面提到过,当指定resumeAfter时传入的是resumeToken,会被转换为对clusterTime的比较。当resumeToken对应的是事务中的某个操作时,由于事务中所有操作都具有相同的clusterTime,如果使用$gt的话可能会漏掉部分操作导致无法恢复的结果。对于其他情况,指定startAtOperationTime就是从某个时间点后,符合参数语义没有问题;什么也不指定,使用myLastAppliedOpTime作为起始时间也没有问题。

  • change Stream是否支持调整Concern?

不支持,返回的change event一定是在大多数节点上已提交的文档。

  • 为什么mongos上建立的监听流要将用户自定义的管道操作符放在mongos上执行,不能下放到mongod上以获得一定的优化吗?

很遗憾,在当前的架构下是只放在mongos上执行的。确实存在一定的优化空间,但需仔细考虑可以下放到mongod的操作符类型以及具体内容。 不妨举个例子来说明。假设用户自定义的操作符为:{$match:{operationType:"insert"}},如果我们将这个阶段下放到mongod,那么所有分片上产生的invalidate事件都会被过滤掉,导致即使发生了非法事件,变更流也将永远不会失效,这是无法接受的。 事实上,如果用户自定义操作符为:{$project:"updateDescription"}(表示用户只关注更新操作到底更新了什么),那么我们将$project下放到mongod可以减少mongos和mongod之间的网络流量。当然了resumeToken也需要被传递来确保所有事件的可排序性。

  • 什么情况下使用resumeToken也无法恢复change stream?

确实存在无法恢复的情况,主要为以下几种: 1)期望恢复的resumeToken所对应的oplog条目已经不在oplog.rs表中。当中断的时间比较长时会出现这种情况。 2)使用的是非法事件(invalidate event)中的resumeToken。不过官方在4.2版本里对这里做了优化,提供了新的startAfter选项,直接传入非法事件的resumeToken,可以恢复到非法事件产生后的时间点。 3)resumeToken格式不合法(只要使用的是驱动或者change event中的resumeToken一般不会遇到此问题)

  • 拉取oplog阶段是否会拉取全量的oplog?

并不是。通过对比主从同步与change stream的cursor可以发现:主从同步只设置了时间戳过滤条件,可以认为是全量拉取,而change stream的cursor的过滤条件更为丰富(参考附录2中matchFilter示例)。 对于库表维度的监听,只会拉取部分跟指定namespace相关的操作。而如果是整集群维度的监听,则会退化为拉取除了少量未处理DDL操作外的大多数oplog。

四、总结

  • Change Stream本质上是聚合命令中的特殊阶段:$changestream,它由一系列内部子阶段组成。
  • Change Stream的总体流程为:拉取oplog-->转换-->检查-->匹配-->返回事件,而且是完全串行的。从相关的JIRA来看,官方并不打算通过在stage内部并行化的方式来优化这里的性能,而是会考虑复用stage。
  • 整体上,change Stream的实现较为完整,尤其是可恢复性方面,并且官方也在性能方面做了一些优化。
  • 故障恢复对于遭遇了非法事件的情况不是特别友好,因为没办法通过resumeAfter来进行恢复。遇到非法事件导致的cursor挂掉的情况只能手动查询挂掉的时间戳后再以startAtOpeartionTime重新启动change stream。不过官方在4.2版本里对这里做了优化,提供了新的startAfter选项,直接传入非法事件的resumeToken,可以恢复到非法事件产生后的时间点。
  • 对于分片集群的情况,mongos上建立的监听流会将所有用户自定义的管道操作符(如$match/$project等)放在mongos上而不是mongod上执行,可能会导致mongos成为change stream的性能瓶颈。这里官方表示未来会优化。
  • 应尽量避免在同一个集群上建立很多个整集群纬度的监听。
  • 不支持一般的explain查询分析,需要使用聚合命令的查询分析方式,比如db.xxx.explain().aggregate([{$changeStream: {}}, {$match: {operationType: "insert"}}])

参考

  1. mongodb source code
  2. mongodb doc
  3. mongodb specifications
  4. Push down user-defined stages in a change stream pipeline where possible
  5. MongoDB Change Stream之一——上手及初体验

附录

1.主从同步的tailable cursor示例

因长度原因,同样只截取部分字段。

代码语言:txt复制
{
    "host": "TENCENT64.site:7006",
    "desc": "conn47362",
    "connectionId": 47362,
    ..."op": "getmore",
    "ns": "local.oplog.rs",
    "command": {
        "getMore": NumberLong("37596103597"),
        "collection": "oplog.rs",
        "batchSize": 13981010,
        "maxTimeMS": NumberLong(5000),
        "term": NumberLong(2),
        "lastKnownCommittedOpTime": {
            "ts": Timestamp(1606124080,1),
            "t": NumberLong(2)
        },
        "$replData": 1,
        "$oplogQueryData": 1,
        "$readPreference": {
            "mode": "secondaryPreferred"
        },
        "$clusterTime": {
            "clusterTime": Timestamp(1606124083,1),
            "signature": {
                "hash": BinData(0,"JYtEpjcKP1mUp16iAR5Ti8/7t4M="),
                "keyId": NumberLong("6846941188591714334")
            }
        },
        "$db": "local"
    },
    "originatingCommand": {
        "find": "oplog.rs",
        "filter": {
            "ts": {"$gte": Timestamp(1594362143,1)}
        },
        "tailable": true,
        "oplogReplay": true,
        "awaitData": true,
        "maxTimeMS": NumberLong(60000),
        "batchSize": 13981010,
        "term": NumberLong(2),
        "readConcern": {
            "afterClusterTime": Timestamp(1594362143,1)
        },
        "$replData": 1,
        "$oplogQueryData": 1,
        "$readPreference": {
            "mode": "secondaryPreferred"
        },
        "$db": "local"
    },
    "planSummary": "COLLSCAN",
    "numYields": 2,
    "waitingForLock": false,
}

2. oplogMatch阶段的matchFilter示例

一个表维度的监听生成的matchFilter示例:

代码语言:txt复制
{
    $and: [
        //resumeAfter和startAtOperationTime中的时间戳会在这里体现,由于我没有指定resumeAfter因此是$gt而非$gte
        {ts: {$gt: Timestamp(1608194033,1)}}, 
        {
            $or: [
                {
                    ns: /^db1.t$/, //库表的匹配是通过正则的方式
                    $or: [
                        {op: {$ne: "n"}}, //非空操作
                        {op: "n",o2.type: "migrateChunkToNewShard"} //Addshard的特殊空操作
                    ]
                },
                {
                    op: "c",  //关注所在db内的一些可能会导致非法事件的DDL,不同纬度的watch略有区别
                    $or: [
                        {	
                            $and: [
                                {ns: "db1.$cmd"},
                                {
                                    $or: [
                                        {o.drop: "t"},
                                        {o.renameCollection: "db1.t"},
                                        {o.to: "db1.t"},
                                        // 检查基于该表的create操作是否指定了非预期的collation
                                        {o.create: "t",o.collation: {$exists: true}
                                        }
                                    ]
                                }
                            ]
                        },
                        {o.to: /^db1.t$/}   // rename到这个表的操作,同样是正则匹配
                    ]
                },
                {	
                    op: "c",	//关注事务相关的DDL操作,以applyOps的形式呈现
                    lsid: {$exists: true},
                    txnNumber: {$exists: true},
                    o.applyOps.ns: /^db1.t$/
                }
            ]
        },
        {fromMigrate: {$ne: true}}	// 非balancer产生的操作,比如moveChunk,cleanupOrphan等等
    ]
}

0 人点赞