RocketMQ-消息存储设计

2023-10-20 10:22:50 浏览数 (2)

消息存储是RocketMQ中最为复杂和最为重要的一部分

消息存储整体架构

消息存储是RocketMQ中最为复杂和最为重要的一部分,将分别从RocketMQ的消息存储整体架构PageCacheMmap内存映射以及RocketMQ中两种不同的刷盘方式三方面来分别展开叙述。

先看这个图,这个图看着复杂,但是多理解几遍历,不要着急跳过去

从这个图理梳理出几个关键词

  1. CommitLog
  2. ConsumeQueue
  3. CommitLogOffset
  4. IndexFile
  5. producer send

CommitLog和producer发送消息有关,ConsumeQueue和consumer消费端有关。

消息存储相关的文件

消息都是存储在 Broker服务器上的以文件形式存储分:Producer端和Consumer端,消息查询也是通过Broker节点查询。

CommitLog 发送端消息主体---Producer端

CommitLog:消息真正的存储文件,所有消息都存储在 CommitLog 文件中。

CommitLog 文件是存放消息数据的地方,所有的消息都将存入到 CommitLog 文件中。 生产者将消息发送到 RocketMQ 的 Broker 后,Broker 服务器会将消息顺序写入到 CommitLog 文件中,这也就是 RocketMQ 高性能的原因,因为我们知道磁盘顺序写特别快,RocketMQ 充分利用了这一点,极大的提高消息写入效率。

CommitLog: 消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如: 00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

ConsumeQueue 消息消费队列---Consume端

consumequeue文件可以看成是基于topic的commitlog索引文件

ConsumeQueue 引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。

Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的:

  1. 起始物理偏移量offset
  2. 消息大小size
  3. 消息Tag的HashCode值

CommitLog 文件是存放消息数据的地方,所有的消息都将存入到 CommitLog 文件中。

但是消费者消费消息的时候,可能就会遇到麻烦,每一个消费者只能订阅一个主题,消费者关心的是订阅主题下的所有消息,但是同一主题的消息在 CommitLog 文件中可能是不连续的,那么消费者消费消息的时候,需要将 CommitLog 文件加载到内存中遍历查找订阅主题下的消息,频繁的 IO 操作,性能就会急速下降。

为了解决这个问题,RocketMQ 引入了 Consumequeue 文件。Consumequeue 文件可以看作是索引文件,类似于 MySQL 中的二级索引。在存放了同一主题下的所有消息,消费者消费的时候只需要去对应的 Consumequeue 组中取消息即可。Consumequeue 文件不会存储消息的全量信息,了解 MySQL 索引的话,应该好理解这里,具体存储的字段,我在上图已经标注。这样做可以带来以下两个好处:

由于 Consumequeue 文件内容小,可以尽可能的保证 Consumequeue 文件全部读入到内存,提高消费效率。 Consumequeue 文件也是会持久化的,不存全量信息可以节约磁盘空间。

consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构, 具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。 同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量4字节的消息长度8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条,每个ConsumeQueue文件大小约5.72M

IndexFile

IndexFile(索引文件)提供了一种可以通过key或时间区间在CommitLog中查询消息,找到offset结的方法。

Index文件的存储位置是$HOMEstoreindex${fileName},文件名fileName是以创建时的时间戳命名的。

固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构。

Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。即使ConsumeQueue中的数据丢失,也可以通过CommitLog来恢复。正因为如此,Consumer也就肯定有机会去消费这条消息。

Consume端当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

页缓存(page cache)与内存映射(mmap)

用于加速对文件的读写。

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。 一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。 对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。

内存映射 (mmap) 另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

消息刷盘

分两种刷盘模式,没有好坏之分,实际应用中根据业务进行调整:

  1. 同步刷盘
  2. 异步刷盘

(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

(2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

总结

RocketMQ 在存储设计的一概念性的东西,熟练应用,并了解这些概念,对后续深入学习RocketMQ和排查问题很有帮助。

0 人点赞