3分钟白话RocketMQ系列—— 如何存储消息

2023-10-22 17:31:59 浏览数 (2)

白话3分钟,快速了解RocketMQ如何存储消息。 看完如果不了解,欢迎来打我。

我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。

那接下来,我们白话一下,RocketMQ是如何存储消息的,揭秘消息存储全过程。

注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ 4.9.4版本

关键字摘要

  • 存储模型与存储类型
  • 如何保证存储消息不丢失
  • 如何提高写入性能
  • 如何清理过期消息

存储模型是什么?有哪些存储类型?

RocketMQ使用了一种基于日志的存储方式,将消息以顺序写入的方式追加到文件中,从而实现高性能的消息存储和读取。

RocketMQ的消息存储方式可以分为两个类型:CommitLogConsumeQueue

还有一个文件类型是indexfile,主要用于控制台消息检索,不影响消息的写入与消费,我们就不展开了。

CommitLog

CommitLog文件存储了Producer端写入的消息主体内容,它以追加写入的方式将消息存储到磁盘上的文件中。

单个文件大小默认1G ,文件名长度为20位(左边补零,剩余为起始偏移量),当文件满了,写入下一个文件。

比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。

它的主要特点是:顺序写,但是随机读(被ConsumeQueue读取)。

虽然是随机读,但是利用package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。

Broker单个实例下所有的队列共用一个日志数据文件CommitLog来存储。而Kafka采用的是独立型的存储结构,每个队列一个文件。

ConsumeQueue

ConsumeQueue文件是用于支持消息消费的存储结构。保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

消费者 通过 顺序读取 ConsumeQueue文件,可以快速定位到消息在CommitLog中的物理存储位置,从而实现快速消息的拉取和消费。

从实际物理存储的角度来看,每个主题Topic下的每个队列Queue对应一个ConsumeQueue文件。

生产者端的消息是顺序写入CommitLog,消费者端是顺序读取ConsumeQueue。 但是根据ConsumeQueue的起始物理位置偏移量offset读取消息真实内容,实际是随机读取CommitLog。 实现了 消息生产与消息消费数据存储和数据索引 相互分离。

怎么保证存储消息不丢失?

刷盘机制

Broker在把消息写入日志文件的过程中,如果在刚收到消息时,Broker异常宕机了,那么内存中尚未写入磁盘的消息就会丢失了。

因此,RocketMQ持久化消息分为两种:同步刷盘和异步刷盘(默认配置)。

异步刷盘是指Broker收到消息后先存储到PageCache,然后立即通知Producer消息已存储成功,可以继续处理业务逻辑。 此后,Broker会启动一个异步线程将消息持久化到磁盘。然而,如果Broker在持久化到磁盘之前发生故障,消息将会丢失。

代码语言:javascript复制
## 刷盘策略配置
flushDiskType = ASYNC_FLUSH 

注意,写入PageCache后,应用服务宕机消息不丢失,只有机器断电或宕机会有少量消息丢失。

相比之下,同步刷盘的方式是在消息存储到缓存后不立即通知Producer,而是等待消息被持久化到磁盘后再通知Producer。 这种方式确保了消息不会丢失,但性能不如异步刷盘高。一般用于金融业务。

代码语言:javascript复制
## 刷盘策略配置
flushDiskType = SYNC_FLUSH 

在选择刷盘方式时,需要根据业务场景进行权衡。

主从同步机制

即使Broker采用同步刷盘策略,但如果刷盘完成后磁盘损坏,会导致所有存储在磁盘上的消息丢失。

即使采用了主从复制,如果主节点在刷盘完成后还没有来得及将数据同步给从节点就发生了磁盘故障,同样会导致数据丢失。

所以我们可以配置同步机制,等待从节点复制完成主节点的消息后,才去通知Producer完成了消息存储。

代码语言:javascript复制
## 主从同步策略配置
brokerRole=SYNC_MASTER

怎么提高存储写入性能?

零拷贝技术

RocketMQ通过使用内存映射文件(包括CommitLog、 ConsumeQueue等文件)来提高IO访问性能,也就是我们常说的零拷贝技术。

Java在NIO包里,引入了sendFile(FileChannel类)和MMAP(MappedByteBuffer类)两种实现方式的零拷贝技术。

主流的MQ都会使用零拷贝技术,来提升IO:

  • Kafka:record 的读和写都是基于 FileChannel。index 的读写则基于 MMAP。
  • RocketMQ:读取数据基于 MMAP,写入数据默认使用 MMAP。但可以通过修改配置transientStorePoolEnable参数将其配置为使用 FileChannel。作者之所以这样设计,是为了避免 PageCache 的锁竞争,并通过两层架构实现读写分离。

缓冲池写入增强

在不开启RocketMQ的内存映射增强方案时,RocketMQ的读和写都只会简单直接使用MMAP。

但是,MappedByteBuffer也存在一些缺陷:

  • 使用虚拟内存,超过物理内存会导致内存交换,引起磁盘IO(可能非顺序IO)速度较慢。
  • 虚拟内存交换是受操作系统控制的,所以其他进程活动也会触发RocketMQ内存映射的交换。
  • 文件内存映射写入PageCache时存在锁竞争,直接写入内存可避免竞争,在异步刷盘场景下速度更快。

为此,RocketMQ通过transientStorePoolEnable参数控制,对写入进行了优化。

如果开启了这个参数,会将写入拆分为两步, 写入缓冲区 异步刷盘 的增强策略。

代码语言:javascript复制
## 刷盘策略配置
flushDiskType = ASYNC_FLUSH 
transientStorePoolEnable = true

MappedFile会提前申请一块直接内存用作缓冲区,放弃使用mmap直接写文件。

数据先写入缓冲区,然后异步线程每200ms(且脏数据达到16K,commitCommitLogLeastPages = 4)将缓冲区的数据commit写入FileChannel

再唤醒定时服务(FlushRealTimeService类)将FileChannel里的数据持久化到磁盘。flush函数和commit一样也可以传入一个刷盘页数,当脏页数量达到16K时(flushLeastPages = 4),会进行刷盘操作,调用FileChannelforce将内存中的数据持久化到磁盘。

开启transientStorePoolEnable参数后,性能最好,但是相对来说持久化最不可靠

如何处理消息的过期和删除?

RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

需要注意的是,在RocketMQ中,消息存储时长并不能完整控制消息的实际保存时间。

因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。

建议在存储成本可控的前提下,尽可能延长消息存储时长。延长消息存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间。

总结

  • 存储模型与存储类型:commitLog文件存储消息物理文件,consumeQueue文件夹存储逻辑队列索引
  • 如何保证存储消息不丢失:同步&异步刷盘、主从消息同步
  • 如何提高写入性能:零拷贝技术MMAP和FileChannel、缓冲区增强 异步刷盘 策略
  • 如何清理过期消息:按存储时长清理消息

3分钟到了吗?应该对RocketMQ如何存储消息有全面了解了吧。 如果还想了解更多,欢迎关注下一期内容。

0 人点赞