Change Stream源码解读

2020-11-02 14:10:47 浏览数 (1)

MongoDB从3.6开始推出了Change Stream功能,提供实时的增量数据流功能,为同步、分析、监控、推送等多种场景使用带来福音。4.0中引入的混合逻辑时钟,可以支持分片集群在不关闭balancer的情况下,吐出的增量数据在即使发生move chunk发生的情况下,还能够保证数据的因果一致性。不但如此,随着4.0.7开始推出的High Water Mark功能,使得返回的change stream cursor包括Post Batch Resume Token,更好的解决Change Stream中ResumeToken推进的问题。关于Change Stream的功能解读,网上可以找到比较多的资料,比如张友东的这篇解读介绍了Change Stream与oplog拉取的对比以及基本的使用。本文将主要侧重从内核源码层面进行解读,主要介绍分片集群版下Change Stream在mongos和mongod上都执行了哪些操作。此外,由于4.0开始MongoDB使用了混合逻辑时钟,从而保证了move chunk的因果一致性,所以本文还会先简单介绍一下MongoDB中混合逻辑时钟的原理。

文本主要内容将会覆盖以下几块:

  1. Change Stream基本功能
  2. 混合逻辑时钟
  3. Change Stream具体处理流程
  4. 总结

名词解释:

  • oplog:MongoDB增量记录,每次修改操作都会产生一条oplog。
  • event:Change Stream中返回的一条记录,表示一次变更。
  • mongos:分片集群中的proxy层。
  • shard/mongod:本文中2个概念一致,都表示分片集群的单个分片。
  • ResumeToken:当前oplog/event的位点,Change Stream根据这个进行断点续传。
  • PBRT:Post Batch Resume Token。MongoDB中最新的ResumeToken。
  • HLC:Hybrid Logical Clock。混合逻辑时钟。
  • Pipeline stage:表示aggregate操作过程中的各个阶段。
  • 时钟/时间戳/混合逻辑时钟/HLC:本文中这几个概念一致,都表示时间戳。

1. Change Stream基本功能

Change Stream的功能主要就一个:推送实时的增量变更数据流。也就是说,MongoDB上进行的所有DML操作(插入、删除、修改)以及部分DDL操作(删表、删库)都可以被推送出来。那么,基于这个功能,就可以实现很多功能,例如:

  • 分析。比如我们需要对MongoDB的数据进行分析,不断拉出用户的更新,推送到下游的分析平台。
  • 迁移/同步/备份。比如把A数据库热迁移到B数据库,数据库形态可以是副本集、集群版。
  • 推送。比如我们用手机地图查看我们需要等待的公交离我们还有几站,我们希望每次公交位置都自动告知用户,而不是我们自己每次去不断刷新主动拉取。
  • 监控。比如有些表是敏感表,我们希望这些表的变更都能告知使用方,防止攻击&误操作。
  • ...

目前,MongoShake从v2.4版本开始支持从Change Stream对接,从而,用户可以非常灵活的基于Change Stream来实现以上多种需求。

1.1 Change Stream event各个字段解释

在使用watch开始监听整个数据库/db/表以后,一旦有符合条件的变更,Change Stream将会吐出一条event代表一次表更(插入/删除/修改/删除/删表等),下面是一条event的具体字段解析:

代码语言:javascript复制
{
    _id : { // 存储元信息
        "_data" : <BinData|hex string> // resumeToken,用于断点续传
    },
    "operationType" : "<operation>", // 包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate
    "fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于oplog中的o字段
    "ns" : { // db和collection的信息
        "db" : "<database>",
        "coll" : "<collection"
    },
    "to" : { // 只在operationType==rename的时候有效,表示改名以后的ns
        "db" : "<database>",
        "coll" : "<collection"
    },
    "documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。
    "updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,出现在$set和$uset场景,区别于replace的整个文档替换。
        "updatedFields" : { <document> }, // 更新的field的值
        "removedFields" : [ "<field>", ... ] // 删除的field列表
    }
    "clusterTime" : <Timestamp>, // 逻辑时钟,相当于ts字段
    "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
    "lsid" : { // logic session id,请求所在的session的id。相当于oplog的lsid字段。
        "id" : <UUID>,
        "uid" : <BinData>
    }
}

Change Stream的event本身是从oplog翻译过来的,所以其中的字段跟oplog字段比较类似。可以看到每一条change stream event都包括一个ResumeToken用于断点续传。

2. 混合逻辑时钟(Hybrid Logical Clock,简称HLC)

本小节会介绍MongoDB关于HLC的使用情况,HLC的具体概念大家感兴趣的话可以查看下面几篇论文:

  • Lamport逻辑时钟:time clock and the ordering of events in a distributed system
  • 混合逻辑时钟:Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases
  • MongoDB中的混合逻辑时钟:Implementation of Cluster-wide Logical Clock and Causal Consistency in MongoDB

混合逻辑时钟是解决分布式场景事件定序的问题。也就是2个具有happened-before关系的事件,他们的混合逻辑时钟将具有大小关系。MongoDB中的混合逻辑时钟为64bit,包括了高位的物理时钟32bit,和低位计数32bit。比如oplog中的ts字段,以及请求/回复消息中的ClusterTime字段等,都是混合逻辑时钟。下面给出了oplog含有ts字段的部分结构:

代码语言:javascript复制
{
    "ts" : Timestamp(1571389994, 1), // 混合逻辑时钟  
    "t" : NumberLong(1),
    "v" : 2,
    "op" : "i",
    ...
}

HLC的比较是64位一起比较的:

  • Timestamp(1000, 2) > Timestamp(1000, 1)
  • Timestamp(1001, 1) > Timestamp(1000, 5)

在MongoDB中,写请求是会推进HLC的,所有请求和请求的返回都是会跟踪HLC的。写请求推进举例:mongos1的HLC为Timestamp(1002, 1),mongos1发送一条写请求给mongod1。那么mongod1收到该写请求后,查看本地物理时钟(假设是1005),那么对比收到的消息的高位跟本地物理时钟,发现是本地的大,就更新mongod1时钟为Timestamp(1005, 0)。mongod1写完毕后,会把本地的时钟携带到回复消息中发送回给mongos1,然后mongos1跟踪该HLC从Timestamp(1002, 1)变成Timestamp(1005, 0)。另外一种情况,最开始mongos1的HLC分别为Timestamp(1002, 1),mongod1收到mongos1的写请求后,本地HLC是1001小于携带过来的请求的1002,则高位为1002,并且递增低位时钟到Timestamp(1002, 2);同理mongod1回复给mongos1的消息会把mongos1的HLC更新为Timestamp(1002, 2)。读请求同样会跟踪mongos1的HLC为Timestamp(1002, 1),mongos1发送一条读请求给mongod1。那么mongod1收到该请求后,如果本地时钟小于mongos1的,则更新本地时钟为Timestamp(1002, 1);如果本地时钟大,则不会进行更新。MongoDB中的HLC应用比较多,比如解决change stream全局定序、session内的read own write、跨shard访问时钟对齐、分布式事务时钟对齐等问题。这里只介绍change stream如何解决move chunk的乱序问题:

2.1 move chunk乱序问题

如果不关闭balancer,在move chunk的情况下,shard并发拉取会产生什么问题?

以上图为例,最开始{_id:1}位于shard1上面,用户执行了更新操作修改为{_id:1, a:1}。然后,发生了move chunk操作,对应{_id:1}的chunk从shard1上挪到了shard2,然后shard2上执行了更新操作:{_id:1, a:3},最后a的结果是3。但是通常情况下,对于同步工具来说,拉取不同shard是一个并发的过程,以MongoShake举例,假设此时线程1拉取shard1,线程2拉取shard2,由于shard1的cpu/带宽/内存/网络io等多种原因,导致shard2的拉取进度快于shard1了,先拉取ts=102进行回放产生的结果是{_id:1, a:3},然后是ts=100,最后{_id:1}对应的结果就是{_id:1, a:1}。这显然是不符合预期,破坏了因果序,所以MongoShake中,用户如果采用oplog进行拉取,那么对于源端MongoDB是分片集群,必须关闭balancer以规避这种情况。而在Change Stream过程中,mongos本身会对拉取到的event进行排序,从而保证了因果一致性。下面章节会具体介绍内部处理细节。

3. Change Stream具体处理流程

客户端发送的Change Stream命令是基于aggregate框架实现的,只不过添加了一个特殊的$changestream stage:

代码语言:javascript复制
"pipeline" : [
  {
    "$changeStream" : {
      "fullDocument" : "default",
      "startAtOperationTime" : Timestamp(1580867312, 3)
    }
  }
],

change stream的断点续传是根据ResumeToken来进行的,下面会具体介绍其用途,这里介绍一下ResumeToken的结构:

代码语言:javascript复制
struct ResumeTokenData {
    Timestamp clusterTime; //HLC
    int version = 0; // 版本号,4.0.7以前为0,4.0.7增加了PBRT,版本号为1
    size_t applyOpsIndex = 0; // applyOps内部的index
    Value documentKey; // 包含_id和shardKey
    boost::optional<UUID> uuid; //表的uuid
};

由于ResumeToken维持不同的版本字段version,所以在版本升级后,有可能出现不同版本 token 无法识别的问题,所以尽量要让 MongoDB Server 所有组件(各个mongod,config-server、mongos)都保持相同的内核版本。下面以分片集群为例,介绍客户端、mongos、mongod对Change Stream的处理流程,其中mongos又分为消息分发和聚合2个步骤。

3.1 客户端

客户端的行为比较简单,用户可以指定一些配置项来创建change stream:

  • ResumeAfter。根据指定的token进行断点续传。
  • StartAfter。根据指定的token进行断点续传,与ResumeAfter不同的是,StartAfter支持从invalidate event中进行恢复。例如,监听的collection被删除了就会返回invalidate event。
  • StartAtOperationTime。根据指定的时间戳进行启动/断点续传。
  • FullDocument。正常情况下对于set/unset,只返回部分修改的字段,但如果FullDocument设置为updateLookup,则会返回整个更新后的文档。
  • 其他配置:batchSize、MaxAwaitTime等。

Change Stream创建后,driver将会发送封装第一个stage为$changestream的aggreate命令给MongoDB,服务端建立成功后返回cursor给客户端。客户端调用getNext获取下一条event事件,对应driver实现是查看当前缓存队列中是否还有event,有则拉出第一条直接返回给上层,没有则发送getMore命令,然后对于返回的一批event进行缓存用于下次返回。driver缓存数据的同时还会存储PBRT(resumeToken),用于处理网络抖动等连接断开情况下的自动断点续传;同时,上层应用也可以根据这个PBRT进行位点的推进以及断点续传。下面是MongoDB对于aggregate请求和getMore请求的返回的具体消息结构:

代码语言:javascript复制
/**
 * Response to a successful aggregate.
 */
{
    ok: 1,
    cursor: {
       ns: String,
       id: Int64,
       firstBatch: Array<ChangeStreamDocument>, // 返回的batch数据数组
       /**
        * postBatchResumeToken is returned in MongoDB 4.0.7 and later.
        */
       postBatchResumeToken: Document // PBRT
    },
    operationTime: Timestamp,
    $clusterTime: Document,
}

/**
 * Response to a successful getMore.
 */
{
    ok: 1,
    cursor: {
       ns: String,
       id: Int64,
       nextBatch: Array<ChangeStreamDocument> // 返回的batch数据数组
       /**
        * postBatchResumeToken is returned in MongoDB 4.0.7 and later.
        */
       postBatchResumeToken: Document // PBRT
    },
    operationTime: Timestamp,
    $clusterTime: Document,
}

3.2 mongos分发

mongos第一次收到Change Stream命令,调用dispatchShardPipeline构建$changestream aggregate分发到所有的shard。目前mongos做法,无论shard上是否有对应客户端指定的db/collection,都会进行分发。这个是为了后续用户发起shardCollection,movePrimaryShard等操作的考虑。这个构建的mongod的$changestream aggregate命令跟mongos本身收到的基本一样,但是额外添加了几个选项方便mongod逻辑处理,例如:{fromMongos: true, needsMerge: true, mergeByPBRT: true}等。这样,mongos上就构建了到各个shard的cursor。接着,根据这些mongod对应的cursor构建mergePipeline,处理各个shard返回change stream event的合并逻辑。构建mergePipeline的过程,主要就是串行添加各个stage的过程(3.4将会详细介绍各个stage的功能):

  • DocumentSourceMergeCursors。合并各个shard返回的cursor。
  • DocumentSourceUpdateOnAddShard。处理新增shard的情况。
  • DocumentSourceCloseCursor。处理invalidate事件。
  • DocumentSourceLookupChangePostImage。如果是event类型是update,且设置了FullDocument=updateLoopup,则会执行额外的query。

对于aggregate的pipeline来说,就是一个个串行stage,上游的输出作为下游的输入。所以最后,ClusterCursorManager缓存这个mergePipeline的client cluster cursor,下一次针对这个tailable的cursor一旦用户发起了getMore请求,就相当于mergePipeline串行跑一遍上述各个stage。

3.3 mongod

shard收到mongos发送的$changestream aggregate请求后,构建shardPipeline,同样也包括多个stage:

  • DocumentSourceOplogMatch。根据指定的filter构建oplog cursor,匹配对应的oplog。
  • DocumentSourceChangeStreamTransform。将对应的oplog翻译成change stream event。
  • DocumentSourceCheckInvalidate。判断是否需要返回invalidate断开change stream cursor。
  • DocumentSourceShardCheckResumability。根据指定的ResumeToken判断是否可以恢复,以及跳过某些event。

下面依次介绍各个stage做的工作:

3.3.1 DocumentSourceOplogMatch

构建oplog query cursor,其filter比较复杂,总结来说,就是过滤moveChunk,noop oplog等无关oplog,找到当前change stream所关注的oplog。下面就是具体匹配的filter:

代码语言:javascript复制
$and:[
    {"ts": {$gte: startFrom}}, // 如果指定了resumeAfter则从这个startFrom开始,否则是$gt lastAppliedTs
    {$or: 
        [
            // opMatch
            {
                "ns": inputNamespace,
                $or: [ 
                    {"op": {$ne: "n"}}, // 不为noop, 表示是普通的CRUD操作
                    {"op": "n", "o2.type": "migrateChunkToNewShard"}, // chunk mirgrate到新的shard上
                ],
            } ,
            // commandMatch,DDL:
            {
                "op": "c",
                $or: [
                    // commandsOnTargetDb
                    $and: [
                        {"ns": inputDb.$cmd}, //如果不是指定全局则用这个,否则是正则匹配所有除local, admin, config之外的db
                        $or: [
                            {o.drop: kRegexAllCollections}, // kRegexAllCollections表示匹配所有的collection,
                            {o.dropDatabase: {$exist: true}}, //drop database操作
                            {o.renameCollection: inputNamespace},
                        ]
                    ]
                    // renameDropTarget
                    {o.to: inputNamespace},
                ]
            }
            // applyOps事务
            {
                "op": "c",
                "lsid": {$exists: true},
                "txnNumber": {$exists: true},
                "o.applyOps.ns": inputNamespace,
            }
        ]
    },
    {"fromMigrate": {$ne: true}}
]

可以看到,对于DDL来说,目前只支持dropCollection、dropDatabase、renameCollection3个操作,其他类似create/delete index,createColleciton,createDatabase,甚至更为复杂的比如convertToCapped、非事务applyOps等操作都没有支持。不过未来MongoDB有计划完善部分DDL。我们还可以看到,这里对于move chunk的oplog({fromMigrate: true})是直接过滤的,因为move chunk oplog本身只是表示挪动过程的目的端插入和源端删除操作,对于Change Stream本身来说没有实际意义,因为数据本身没有发生变更,只是位置上的变化。oplog的时钟是HLC的,从shard1挪到了shard2,则shard1和shard2的时钟都会推进,且具有因果序,所以shard1上move chunk之前的oplog时钟肯定是小于shard2上move chunk之后的oplog。从而直接对这些oplog排序即可保证因果一致性。

3.3.2 DocumentSourceChangeStreamTransform

内部通过调用applyTransformation将oplog翻译成对应的change stream event,这一步也是比较消耗资源,需要经历bson序列化/解序列化操作,而又因为整个pipeline本身就是单线程串行处理的过程,所以如果源端用户写入非常大,这一步可能会成为性能瓶颈。未来MongoDB可能会对这个stage进行优化,优化的方向可能:

  1. 多线程保序解析
  2. 整个pipeline多线程化,每个线程处理部分数据
  3. 合并DocumentSourceChangeStreamTransformDocumentSourceOplogMatch以减少反复序列化/解序列化的开销。

此外,事务的oplog跟普通oplog处理略有不同,比如要处理applyOps内部嵌套oplog的拆分,以保证事务内部的断点续传。

3.3.3 DocumentSourceCheckInvalidate

判断是否返回invalidate,以下情况需要返回invalid:

  1. 如果是collection watch,则dropCollection,renameCollection,dropDatabase都会触发invalid。
  2. 如果是db watch,则dropDatabase会触发invalid。

将invalid change stream event返回,返回之前,会进行状态的本地存储,这样下次用户在当前cursor继续getNext将会继续返回该invalid change stream event。

3.3.4 DocumentSourceShardCheckResumability

这一步主要判断给定的ResumeToken是否可以恢复/启动,以及跳过部分不需要的event,如果用户没有指定,mongos本身会生成一个ResumeToken发送到mongod。通过对比mongos输入的resumeToken和oplog query返回的ResumeToken,来决定当前拿到的event是否需要返回给客户端,这个比较会有以下3种结果:

  1. kCheckNextDoc。表示mongod拉到的event比较老,需要skip一部分才能得到client token所需要的event。
  2. kFoundToken。表示mongod拉到的event恰好是mongos/客户端需要的。所以直接返回即可。
  3. kSurpassedToken。表示mongod拉到的event大于mongos/客户端需要的,也就是说,一部分数据已经丢了。此时,就需要返回错误了。

这里,可能有读者有疑问,为什么多此一举,根据客户端/mongos指定的ResumeToken返回的oplog难道不就是mongos/客户端所需要的吗?答案是否定的。第一个阶段DocumentSourceOplogMatch中,oplog中过滤指定位点信息的只有时间戳:{"ts": {$gte: startFrom}},而没有其他更为具体的信息,比如事务applyOps中的txnOpIndex,表示事务内的偏移。举个例子,某事务包含3条语句:insert a = 1; insert a= 2; insert a = 3,返回第二条数据后cursor断开了,重连后根据时间戳恢复,如果是单机事务采用applyOps,那么时间戳都是一致的,断点续传后第一条返回是insert a=1,而客户端需要的是第三条a=3。所以需要skip前面2条。此外,还有一些情况需要处理,比如用户shardCollection,或者4.4中refine了shardKey,那么客户端携带的resumeToken跟event返回的token中documentKey是不一致的,需要一些特殊处理。

3.4 mongos聚合

mergePipeline分拆各个stage处理聚合。

3.4.1 DocumentSourceMergeCursors

这一步是用于合并各个shard返回的cursor。下面显示的mergeCursors的对于2个shard示例:

代码语言:javascript复制
{
    $mergeCursors: {
        lsid: {
            id: BinData(4, "DFFB25F3B9444D0CA4F790FADC73E965"),
            uid: BinData(0, "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855")
        },
        sort: {_id._data: 1}, // _id._data就是ResumeToken
        compareWholeSortKey: false,
        remotes: [
            {
                shardId: "shard1",
                hostAndPort: "xx.xx.xx.xx:33001",
                cursorResponse: {
                    cursor: {
                        id: 7258869757099183534,
                        ns: "zz.$cmd.aggregate",
                        firstBatch: [],
                        postBatchResumeToken: {
                            _data: "825F1BF44A000000032B0229296E04"
                        }
                    },
                    $_internalLatestOplogTimestamp: Timestamp(1595667530, 3),
                    ok: 1
                }
            },
            {
                shardId: "shard2",
                hostAndPort: "xx.xx.xx.xx:33004",
                cursorResponse: {
                    cursor: {
                        id: 6627263387104649979,
                        ns: "zz.$cmd.aggregate",
                        firstBatch: [],
                        postBatchResumeToken: {
                            _data: "825F1BF44A000000032B0229296E04"
                        }
                    },
                    $_internalLatestOplogTimestamp: Timestamp(1595667530, 3),
                    ok: 1
                }
            }
        ],
        tailableMode: "tailableAndAwaitData",
        nss: "zz.$cmd.aggregate",
        allowPartialResults: false
    }
}

可以看到这里是按照ResumeToken进行排序的(sort: {_id._data: 1}),这里的_id._data就是ResumeToken。ResumeToken的排序就是根据内部字段ts documentKey uuid等多个维度进行排序。合并各个cursor的过程就是一个队列多路归并 小顶堆排序的流程:

mongos上针对每个cursor都维持了一个docBuffer queue存放拉取的change stream event,通过比较函数,每次将会找所有docBuffer queue中队列头部ResumeToken最小的event,比如上图3个队列,队列头部最小的是队列3。然后跟minPromisedSortKey进行比较,如果小于minPromisedSortKey则返回给客户端,否则将会循环等待minPromisedSortKey的推进。那么minPromisedSortKey又是什么?推进规则又是如何?这里需要先介绍一下Post Batch Resume Token(简称PBRT)。其是4.0.7推出的,标识当前MongoDB内部最大的oplog的时间戳。也就是说,如果后面MongoDB一旦产生新的oplog,则时间戳肯定大于PBRT。对于mongos来说,每个mongod的cursor getMore请求都会返回PBRT,所以这个相当于是mongod给mongos的一个承诺,承诺以后不会返回时间戳小于PBRT的event。根据这个,Change Stream实现了排序等待的策略。此外,PBRT对于应用层还有另外一个作用:可以不断推进同步的位点,举个例子,我启动一个Change Stream监听A表,但是A表自从10:00以后没有写入,全部都是其余表的写入,那么同步的位点一直停留在10:00。18:00的时候网络断开了,我根据10:00进行恢复,但是由于我的磁盘比较小,oplog只能保留5个小时,也就是说,此时最老的oplog时间戳是13:00,那么就无法恢复了。为了防止这种无法恢复,或者即使恢复也需要大面积扫描的操作(从10:00扫描oplog到18:00),PBRT的作用就体现出来了,客户端只需要不断缓存PBRT,即使A表一直没数据,位点也被不断推进到18:00,从而解决这种oplog丢失和大面积扫描的操作。回答之前的问题,minPromisedSortKey是什么?minPromisedSortKey就是所有shard中最小的PBRTmin{PBRT(mongd1), PBRT(mongod2), ...}。关于mongos合并mongod数据的过程,下面给出了归纳的伪代码流程:

代码语言:javascript复制
while (no_data() || data_from_heap_not_less_than_minPromisedSortKey()) { // 如果没有数据,或者堆顶元素大于等于minSortKey,则会一直循环
send_getMore_to_each_shard() // 发送getMore到每个shard
    advance_minPromisedSortKey() // 根据返回的PBRT推进minPromisedSortKey
}

return ready_batch() // 返回一批符合条件的batch

首先会判断是否有数据并且存在数据小于minPromisedSortKey,如果是则不用走内部循环,直接返回符合条件的batch event即可。否则进入循环,循环内部会发送getMore到每个shard,拉取到的数据同样都存入docBuffer队列中,并且根据返回的PBRT推进minPromisedSortKey(即使shard没有数据,超时返回的response也会携带PBRT),minSortKey的值等于所有shard返回的PBRT中最小的值。一旦while循环判断有堆顶元素满足要求:小于等于minSortKey,则退出循环,聚合batch返回给客户端了。这是因为所有的shard都不会产生时间戳比minSortKey更小的event了,这个event也就可以返回了。基于这个逻辑,即使某个shard一直没有产生对应database/collection的event,也会不断推进PBRT,从而使得mongos可以顺利返回数据给客户端,而不会一直block。此外,mongos的PBRT(返回的数据中的最后一条PBRT)也会携带在getMore的response中返回给客户端。上图的例子中,本次的minPromisedSortKey已经推进到了10,则10之前的都可以聚合batch返回了,10之后的还需要继续在各个docuBuffer队列中等待下一次minPromisedSortKey的推进才能返回。

到这里,可能会有读者提问,2个shard是否会产生的时钟一致的oplog/event,是的话如何排序?答案是不同的shard是可能会产生时钟一致的oplog/event,这是因为在没有消息交互的情况下,不同shard的HLC是独立推进的。如果产生的event一致,Change Stream只需要按照resumeToken进行排序即可,将会综合ts documentKey uuid等多个维度进行排序。换句话说,如果2个shard产生的oplog/event一致,证明这2个oplog是完全并发的,谁先谁后并不重要,只要规则固定即可。细心的读者可能还会发现,这里由于需要等待minPromisedSortKey的推进才能返回数据,所以有一个“等待”的策略,会影响实时性。的确如此,不过通常来说这个延迟是秒级别的,所以对于同步场景来说,基本上是可以接受的。

3.4.2 DocumentSourceUpdateOnAddShard

这个stage用于有新增shard的时候,构建新的cursor到这个shard。判断的方式是发现返回的event的operationType==kNewShardDetected。这个operationType是在DocumentSourceChangeStreamTransform里面转换的:

代码语言:javascript复制
{
    "ns": inputNamespace,
    $or: [ 
        {"op": {$ne: "n"}}, // 不为noop, 表示是普通的CRUD操作
        {"op": "n", "o2.type": "migrateChunkToNewShard"}, // chunk mirgrate到新的shard上
    ],
}

在DocumentSourceOplogMatch的时候会返回op=n,但是o2.type=migrateChunkToNewShard以表示产生了新的shard并且发生了move chunk的操作。DocumentSourceChangeStreamTransform发现有op=n的时候,就返回operationType=kNewShardDetected

3.4.3 DocumentSourceCloseCursor

当发送invalidate事件后,则标记_shouldCloseCursor,下次再调用getNext会抛出异常,mongos上层捕获后close掉cursor。

3.4.4 DocumentSourceLookupChangeImage

用户如果设置了{``fullDocument: updateLookup},则对于event的operation类型是update还会进行一次find查找(普通update操作对应的operation是replace,只有$set/$unsert对应的才是update)。如果是位于mongos,则会设置readConcern=majority,并且配置afterClusterTime时间戳为返回的event中的resumeToken中的clusterTime,为什么副本集上就没有afterClusterTime参数?因为这一步对于副本集,读到的肯定是更新以后的,因为change stream本身吐出来的节点跟当前query的节点一致。最后,调用lookupSingleDocument发送find命令进行查找。注意,这里只能够保证最终一致性。如果用户频繁更新可能会发生丢失,例如:用户更新{_id:1, a:1}{_id:1, a:2},然后又更新为{_id:1, a:3}。则可能2个change event返回的更新后的值都是{_id:1, a:3}。这个跟DynamoDB的Stremas中吐出的event包含真实的修改后数据NewImage和修改前数据OldImage不同,MongoDB只能做到吐出修改后的数据NewImage,而这个NewImage是通过一次额外的query实现的,所以只能保证最终一致性。这个机制产生的另外一个问题就是性能问题,由于update需要进行额外的find,那么返回event的延迟就会增大。

4. 总结

从功能来说,Change Stream可以提供实时数据流,满足多种需求。对比oplog拉取,Change Stream可以解决事务、DDL等集群版比较难处理的问题。但是目前DDL的种类还不够丰富,仅支持有限的3种DDL:dropCollectiondropDatabaserenameCollection。后面MongoDB官方也会继续优化此类功能。从使用来说,Change Stream使用的门槛比较低,不像oplog拉取需要自己处理复杂的对接和断点续传。从性能来说,Change Stream跟单纯的oplog拉取还是有一定的差距,瓶颈主要位于mongod上的翻译:DocumentSourceChangeStreamTransform;此外,由于mongos对于不同shard的event有聚合“等待”的逻辑,所以实时性上来说不如oplog直接拉取,但总体来说,这个实时性是可以接受的。总的来说,使用Change Stream的优点大于缺点,而且后面MongoDB官方还会不断进行优化,推荐大家使用。最后,MongoShake对接了Change Stream,支持同步到MongoDB,以及Kafka在内的多种通道,方便大家进行迁移、同步、分析等多场景,欢迎使用!

参考:

https://lamport.azurewebsites.net/pubs/time-clocks.pdf

https://cse.buffalo.edu/tech-reports/2014-04.pdf

https://dl.acm.org/doi/pdf/10.1145/3299869.3314049

https://github.com/alibaba/MongoShake

作者介绍

陈星(花名烛昭),阿里云数据库NoSQL团队技术专家,MongoShake/NimoShake/RedisShake作者,当前主要参与阿里云MongoDB的内核以及相关MongoDB产品开发和维护工作。致力于为用户提供更好的云服务以及开源生态产品。

MongoDB中文手册翻译正在进行中,欢迎更多朋友在自己的空闲时间学习并进行文档翻译,您的翻译将由社区专家进行审阅,并拥有署名权更新到中文用户手册和发布到社区微信内容平台。点击下方图片即可领取翻译任务——

0 人点赞