导语
我们在之前的《深入解析Apache Pulsar系列之一 —— 客户端消息确认》中介绍过Apache Pulsar客户端的多种消息确认模式。这篇文章中,我们将介绍Broker侧对于消息确认的管理。
作者简介
林琳
腾讯云中间件专家工程师
Apache Pulsar PMC,《深入解析Apache Pulsar》作者。目前专注于中间件领域,在消息队列和微服务方向具有丰富的经验。负责TDMQ的设计与开发工作,目前致力于打造稳定、高效和可扩展的基础组件与服务。
客户端通过消息确认机制通知Broker某些消息已经被消费,后续不要再重复推送。Broker侧则使用游标来存储当前订阅的消费位置信息,包含了消费位置中的所有元数据,避免Broker重启后,消费者要从头消费的问题。Pulsar中的订阅分为持久订阅和非持久订阅,他们之间的区别是:持久订阅的游标(Cursor)是持久化的,元数据会保存在ZooKeeper,而非持久化游标只保存在Broker的内存中。
游标的简介
Pulsar中每个订阅都会包含一个游标,如果多个消费者拥有相同的订阅名(消费组),那这些消费者们会共享一个游标。游标的共享又和消费者的消费模式有关,如果是Exclusive或者FailOver模式的订阅,那同一时间只有一个消费者使用这个游标。如果是Shared或者Key_Shared模式的订阅,那多个消费者会同时使用这个游标。
每当消费者Ack一条消息,游标中指针的位置都有可能会变化,为什么说是有可能呢?这涉及到我们在客户端章节介绍的Acknowledge的方式:单条消息确认(Acknowledge)、批消息中的单个消息确认(Acknowledge)、累积消息确认(AcknowledgeCumulative)。否定应答(NegativeAcknowledge)不涉及游标的变化,因此不在讨论范围之内。
我们先看单条消息的确认,如果是独占式的消费,每确认一条消息,游标位置都会往后移动一个Entry,如下图所示:
累积消息确认,只需要确认一条消息,游标可以往后移动多个Entry,如:Consumer-1累积确认了Entry-4,则从0开始的Entry都会被确认,如下图所示:
对于共享式的消费,因为有多个消费者同时消费消息,因此消息的确认可能会出现空洞,空洞如下图所示:
这里也解释了为什么MarkeDeletePosition指针的位置可能发生变化,我们可以从共享式的消费中看到,消息确认是可能出现空洞的,只有当前面所有的Entry都被消费并确认,MarkeDeletePosition指针才会移动。如果存在空洞,MarkeDeletePosition指针是不会往后移动的。那这个MarkeDeletePosition指针和游标是什么关系呢?游标是一个对象,里面包含了多个属性,MarkeDeletePosition指针只是游标的其中一个属性。正如上面所说的Ack空洞,在游标中有另外专门的方式进行存储。如果我们不单独存储空洞,那Broker重启后,消费者只能从MarkDeletePosition开始消费,会存在重复消费的问题。如果以上图为例,Broker重启后Entry-4就会被重复消费。当然,Pulsar中对空洞信息是有单独存储的。
然后,我们看看游标里到底记录了什么元数据,此处只列出一些关键的属性:
属性名 | 描述 |
---|---|
Bookkeeper | Bookkeeper Client的引用,主要用来打开Ledger,例如:读取历史数据,可以打开已经关闭的Ledger;当前Ledger已经写满,打开一个新的Ledger。 |
MarkDeletePosition | 标记可删除的位置,在这个位置之前的所有Entry都已经被确认了,因此这个位置之前的消息都是可删除状态。 |
PersistentMarkDeletePosition | MarkDeletePosition是异步持久化的,这个属性记录了当前已经持久化的MarkDeletePosition。当MarkDeletePosition不可用时,会以这个位置为准。这个位置会在游标Recovery时初始化,后续在持久化成功后不断更新。 |
ReadPosition | 订阅当前读的位置,即使有多个消费者,读的位置肯定是严格有序的,只不过消息会分给不同的消费者而已。读取的位置会在游标恢复(Recovery)时初始化,消费时会不断更新 |
LastMarkDeleteEntry | 最后被标记为删除的Entry,即MarkDeletePosition指向的Entry。 |
CursorLedger | Cursor在Zookeeper中只会保存索引信息,具体的Ack数据会比较大,因此会保存到Bookkeeper中,这个属性持有了对应Ledger的引。 |
ndividualDeletedMessages | 用于保存Ack的空洞信息。 |
BatchDeletedIndexes | 用于保存批量消息中单条消息Ack信息。 |
看到CursorLedger,说明数据被保存到了Bookkeeper中。有的读者可能会有疑问,既然数据都保存到Bookkeeper中了,那ZooKeeper中保存的Cursor信息有什么用呢?我们可以认为ZooKeeper中保存的游标信息只是索引,包含了以下几个属性:
- 当前的cursorLedger名以及ID,用于打开Bookkeeper中的Ledger;
- LastMarkDeleteEntry,最后被标记为删除的Entry信息,里面包含了LedgerId和EntryId;
- 游标最后的活动时间戳。
游标保存到ZooKeeper的时机有几个:
- 当cursor被关闭时;
- 当发生Ledger切换导致cursorLedger变化时;
- 当持久化空洞数据到Bookkeeper失败并尝试持久化空洞数据到ZooKeeper时。
我们可以把ZooKeeper中的游标信息看作Check Point,当恢复数据时,会先从ZooKeeper中恢复元数据,获取到Bookkeeper Ledger信息,然后再通过Ledger恢复最新的LastMarkDeleteEntry位置和空洞信息。
既然游标不会实时往ZooKeeper中写入数据,那是如何保证消费位置不丢失的呢?
Bookkeeper中的一个Ledger能写很多的Entry,因此高频的保存操作都由Bookkeeper来承担了,ZooKeeper只负责存储低频的索引更新。
消息空洞的管理
在游标对象中,使用了一个IndividualDeletedMessages容器来存储所有的空洞信息。得益于Java中丰富的轮子生态,Broker中直接使用了Guava Range这个库来实现空洞的存储。举个例子,假设在Ledger-1中我们的空洞如下:
则我们存储的空洞信息如下,即会用区间来表示已经连续Ack的范围:
[ (1:-1, 1:2] , (1:3, 1:6] ]
使用区间的好处是,可以用很少的区间数来表示整个Ledger的空洞情况,而不需要每个Entry都记录。当某个范围都已经被消费且确认了,会出现两个区间合并为一个区间,这都是Guava Range自动支持的能力。如果从当前MarkDeletePosition指针的位置到后面某个Entry为止,都连成了一个区间,则MarkDeletePosition指针就可以往后移动了。
记录了这些消息空洞之后,是如何用来避免消息重复消费的呢?
当Broker从Ledger中读取到消息后,会进入一个清洗阶段,如:过滤掉延迟消息等等。在这个阶段,Broker会遍历所有消息,看消息是否存在于Range里,如果存在,则说明已经被确认过了,这条消息会被过滤掉,不再推送给客户端。Guava Range提供了Contains接口,可以快速查看某个位置是否落在区间里。这种Entry需要被过滤的场景,基本上只会出现在Broker重启后,此时游标信息刚恢复。当ReadPosition超过了这段空洞的位置时,就不会出现读到重复消息要被过滤的情况了。
然后,我们来看看IndividualDeletedMessages这个容器的实现。
IndividualDeletedMessages 的类型是LongPairRangeSet,默认实现是DefaultRangeSet,是一个基于Google Guava Range包装的实现类。另外一个Pulsar自己实现的优化版:ConcurrentOpenLongPairRangeSet。优化版的RangeSet和Guava Range的存储方式有些不一样,Guava Range使用区间来记录数据,优化版RangeSet对外提供的接口也是Range,但是内部使用了BitSet来记录每个Entry是否被确认。
优化版RangeSet在空洞较多的情况下对内存更加友好。我们可以假设一个场景,有100W的消息被拉取,但是只有50W的消息已经被Ack,并且每隔一条消息Ack一条,这样就会出现50W个空洞。此时的Range就无法发挥区间的优势了,会出现50W个Range对象,如下图所示。而优化版的RangeSet使用了BitSet,每个ack只占一位。
我们可以在broker.conf中,通过配置项managedLedgerUnackedRangesOpenCacheSetEnabled=true来开启使用优化版的RangeSet。
也正因如此,如果整个集群的订阅数比较多,游标对象的数据量其实并不小。所以在Pulsar中,MetaDataStore中只保存了游标的索引信息,即保存了游标存储在哪个Ledger中。真正的游标数据会通过上面介绍的cursorLedger写入到Bookkeeper中持久化。整个游标对象会被写入到一个Entry中,其Protobuf的定义如下:
message PositionInfo { required int64 ledgerId = 1; required int64 entryId = 2; repeated MessageRange individualDeletedMessages = 3; repeated LongProperty properties = 4; repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;}
看到这里,其实Batch消息中单条消息确认的实现也清晰了,BatchDeletedIndexes是一个ConcurrentSkipListMap,Key为一个Position对象,对象里面包含了LedgerId和EntryId。Value是一个BitSet,记录了这个Batch里面哪些消息已经被确认。BatchDeletedIndexes会和单条消息的空洞一起放在同一个对象(PositionInfo)中,最后持久化到Bookkeeper。
空洞数据如果写入Bookkeeper失败了,现在Pulsar还会尝试往ZooKeeper中保存,和索引信息一起保存。但是ZooKeeper不会保存所有的数据,只会保存一小部分,尽可能的让客户端不出现重复消费。我们可以通过broker.conf中的配置项来决定最多持久化多少数据到ZooKeeper,配置项名为:managedLedgerMaxUnackedRangesToPersistInZooKeeper,默认值是1000。
消息空洞管理的优化
空洞存储的方案看起来已经很完美,但是在海量未确认消息的场景下还是会出现一些问题。首先是大量的订阅会让游标数量暴增,导致Broker内存的占用过大。其次,有很多空洞其实是根本没有变化的,现在每次都要保存全量的空洞数据。最后,虽然优化版RangeSet在内存中使用了BitSet来存储,但是实际存储在Bookkeeper中的数据MessageRange,还是一个个由LedgerId和EntryId组成的对象,每个MessageRange占用16字节。当空洞数量比较多时,总体体积会超过5MB,而现在Bookkeeper能写入的单个Entry大小上限是5MB,如果超过这个阈值就会出现空洞信息持久化失败的情况。
对于这种情况,已经有专门的PIP在解决这个问题,笔者在写这篇文章的时候,这个PIP代码已经提交,正在Review阶段,因此下面的内容可能会和最终代码有一定差距。
新的方案中主要使用LRU 分段存储的方式来解决上述问题。由于游标中空洞信息数据量可能会很大,因此内存中只保存少量热点区间,通过LRU算法来切换冷热数据,从而进一步压缩内存的使用率。分段存储主要是把空洞信息存储到不同的Entry中去,这样能避免超过一个Entry最大消息5MB的限制。
如果我们把空洞信息拆分为多个Entry来存储,首先面临的问题是索引。因为使用单个Entry记录时,只需要读取Ledger中最后一个Entry即可,而拆分为多个Entry后,我们不知道要读取多少个Entry。因此,新方案中引入了Marker,如下图所示:
当所有的Entry保存完成后,插入一个Marker,Marker是一个特殊的Entry,记录了当前所有拆分存储的Entry。当数据恢复时,从后往前读,先读出索引,然后再根据索引读取所有的Entry。
由于存储涉及到多个Entry,因此需要保证原子性,只要最后一个Entry读出来不是Marker,则说明上次的保存没有完成就中断了,会继续往前读,直到找到一个完整的Marker。
空洞信息的存储,也不需要每次全量了。以Ledger为单位,记录每个Ledger下的数据是否有修改过,如果空洞数据被修改过会被标识为脏数据,存储时只会保存有脏数据的部分,然后修改Marker中的索引。
假设Entry-2中存储的空洞信息有修改,则Entry-2会被标记为脏数据,下次存储时,只需要存储一个Entry-2,再存储一个Marker即可。只有当整个Ledger写满的情况下,才会触发Marker中所有Entry复制到新Ledger的情况。如下图所示:
ManagedLedger在内存中通过LinkedHashMap实现了一个LRU链表,会有线程定时检查空洞信息的内存占用是否已经达到阈值,如果达到了阈值则需要进行LRU换出,切换以Ledger为单位,把最少使用的数据从Map中移除。LRU数据的换入是同步的,当添加或者调用Contains时,发现Marker中存在这个Ledger的索引,但是内存中没有对应的数据,则会触发同步数据的加载。异步换出和同步换入,主要是为了让数据尽量在内存中多待一会,避免出现频繁的换入换出。
后记
Pulsar中的设计细节非常多,由于篇幅有限,作者会整理一系列的文章进行技术分享,敬请期待。如果各位希望系统性地学习Pulsar,可以购买作者出版的新书《深入解析Apache Pulsar》。
扫描上方二维码了解详情
One More Thing
目前,腾讯云消息队列 TDMQ Pulsar版(TDMQ for Pulsar,简称 TDMQ Pulsar 版)已开始正式商业化。消息队列 Pulsar 版是一款基于 Apache Pulsar 自研的消息中间件,具备极好的云原生和 Serverless 特性,兼容 Pulsar 的各个组件与概念,具备计算存储分离,灵活扩缩容的底层优势。
各位想要了解的请点击阅读原文。
★
福利时间
您对Apache Pulsar有什么了解?
评论区留言并分享文章至朋友圈
我们将在精选留言中随机抽送
作者的新书
往期
推荐
《腾讯云消息队列 TDMQ Pulsar 版商业化首发|持续提供高性能、强一致的消息服务》
《Serverless可观测性的价值》
《喜报|CKafka荣获可信云消息队列服务稳定性先进级认证》
《RoP重磅发布0.2.0版本:架构全新升级,消息准确性达100%》
《ZooKeeper系列文章:ZooKeeper 源码和实践揭秘(二)》
《深入理解Rabbit MQ与AMQP协议》
《应用多环境部署的最佳实践》
《单元化架构在金融行业的最佳实践》
《服务器又崩了?深度解析高可用架构的挑战和实践》
《Kratos技术系列|从Kratos设计看Go微服务工程实践》
扫描下方二维码关注本公众号,
了解更多微服务、消息队列的相关信息!
解锁超多鹅厂周边!
戳原文,查看更多TDMQ Pulsar版信息!
点个在看你最好看