rabbitmq——索引文件的读写机制

2023-02-28 14:13:03 浏览数 (2)

【前言】

在前面的文章中,我们讲解了索引文件的格式,里面提到了针对消息主要有publish,delivery,ack三个操作,而索引文件中主要也就是存储了消息这三个操作对应的二进制数据。那么什么情况下会进行索引文件的读写,具体流程又是怎样的,有些怎样的设计考虑。本文对其相关原理进行了一些总结。

【重要数据结构】

在rabbitmq内部,rabbit_queue_index模块负责队列索引文件的读写,并在内存中维护相关信息,其内部结构如下图所示:

主要的数据结构是qistate和segment。其中qistate是queue index state的缩写,包含的几个重要字段为:

dir:该队列索引信息存储路径

segments:segment的集合

journal_handle:journal.jif文件的操作句柄

max_journal_entries:日志flush到磁盘之前,允许在内存缓存的最大条数

dirty_count:没有flush到磁盘的日志条数,即当前内存缓存的日志条数

segment包含的几个重要字段是:

num:segment的编号,也是该segment写入的idx文件对应的编号,从0开始递增(每个segment中的数据会写入一个对应的后缀为idx的文件中)

path:segment对应存储文件(*.idx)的完整路径

journal_entries:一个数组,数组中的每一项记录一条消息已执行的操作(publish,delivery,ack)

entries_to_segment:也是一个数组,数组中的每一项对应一条消息已执行的操作的二进制数据

unack:该segment中已经投递给消费者,但还未收到消费者ack的消息数

file_handle_cache模块:几乎所有的文件操作都会调用该模块进行处理。对于每个将要读写的文件,除了可以控制设置对应的打开模式(仅读、仅写、读写)之外,还可以设置缓存的大小。对于写模式,所有的写入都是先直接写到缓存中,只有等缓存大小超过了设置的值的时候,才将缓存数据真正写入文件。

【写流程】

rabbit_queue_index模块提供了publish,delivery,ack接口供调用,这三个接口的处理流程差不多都一样:首先将对消息的操作序列化成二进制数据,并追加写入日志文件journal.jif中;然后根据消息的序号,找到消息对应存储的segment,并在segment中journal_entries记录消息对应的操作,同时将二进制数据添加到entries_to_segment对应的数组项中;最后按需将segment中的数据写入相应的idx文件中。

粗略看这个流程,觉得逻辑相对简单,但仔细分析,就会有很多疑问。例如:每个操作对应的日志数据都会先写入journal.jif文件,那是否意味着每条日志数据都实时刷到磁盘上了?segment中的数据什么时候写入相应的idx文件中?segment中的数据写到idx文件后,journal.jif中的数据是否还有用,会被怎么处理?这两个文件中的数据有什么区别?如果生产者发送的一条消息立马被消费者消费了,这条消息相关的操作数据还会被写到磁盘上吗?

阅读相关源码后,让我们一个一个来看这些问题。

  • 是否消息的每个操作日志数据都实时写入journal.jif文件中了?

其实写journal.jif文件时并非真正将内容写入到文件中了。前面讲到了文件的读写操作都通过file_handle_cache模块处理。对journal.jif文件的操作,具体是以写模式打开,同时缓存大小设置为无穷大。因此,在索引模块中,对消息的每个操作日志数据进行的写(journal.jif文件)操作,最终都只是在内存中缓存,并没有真正进行文件系统级别的写操作。只有等关闭该文件,或者显示调用将内存数据同步刷到磁盘时,才进行真正的操作。

  • 什么时候将segment中的数据写入idx文件呢?

前面提到了qistate中有个dirty_count字段,表示未写入磁盘的日志数量。消息每个操作的日志数据写journal.jif时,该字段加1;当累计到指定值时,会将当前segment中的数据写入相应idx文件中。这个值默认大小为32768,也就是队列消息的publish、delivery、ack操作累计达到32768次后,将segment中的数据写入idx文件中。

可通过queue_index_max_journal_entries进行配置。

除此之外,队列会根据内存的使用情况,动态决定在内存驻留的消息数(初始为无穷大),如果dirty_count超过了内存允许驻留的消息数,也会触发将segment中的数据写入idx文件中。

另外,idx文件和journal.jif文件一样,以写模式打开并设置缓存为无穷大,写入的消息都先在缓存中,但idx文件写完后会立即关闭该文件,这样缓存中的数据会通过writev进行文件系统级别的写动作,并最终通过fsync同步刷到磁盘上。

考虑这么一种情况:当内存足够(保证队列允许在内存驻留的消息数足够大)时,仅向队列发送一条(持久化的)消息,此时上面提到的两个条件均无法满足,是否这种情况下,消息就永远不会写入到磁盘上了?

答案当然是否定的!

队列收到消息后,其对应的处理进程会设置一个定时器,定时器超时后会触发rabbit_queue_index模块将journal.jif中的数据同步刷到磁盘中(注意:这里仅仅是将journal.jif缓存中的数据写入文件中,并没有写idx文件)。另外,当队列进程持续一段时间没有收到任何消息时,会进入hibernate阶段,这个时候会触发将segment中的数据写入idx文件中。

  • 数据写入idx了后,journal.jif中的数据怎么处理?

当segment中的数据写入idx文件后,会清空journal.jif对应缓存中的数据,同时通过ftruncate对文件清空,并将文件偏移位置移动到起始位置。也就是说数据写入idx之后,journal.jif中的数据就没有用了,直接进行清除处理。

  • idx文件与journal.jif文件中的数据有什么区别呢?

两个文件中的数据几乎没有区别,都是消息操作的二进制数据。唯一的差别在于消息的ID的记录。在journal.jif中记录的是消息在队列中的序号,而在idx文件中记录的是消息在该文件中的序号(也就是journal_entries数组中的下标)。

因为journal.jif文件只有一个,里面记录了所有消息的操作,因此需要记录消息在队列中的序号,保证消息被有序消费;而每个idx文件中,固定存储16384个消息的操作日志数据,对消息在队列中的序号除16384再求余,就计算出应该存储在哪个segment中数组的哪个位置了,因此存储的时候只需要存储在该文件中的相对序号即可。

  • 生产者发送的一条消息如果立马被消费者消费了,该消息相关的操作数据是否还会写到磁盘上?

前面讲到了segment中的journal_entries是一个数组,记录该segment中每个消息已执行到操作。当消息最终被消费者ack后,rabbit_queue_index会将journal_entries中该消息相关的操作清空,同时entries_to_segment中对应的二进制数据也会被清除,这意味着将segment中的数据写入到idx文件的时候,该消息因为没有任何操作日志,所以针对该消息也就不会有任何数据写入到idx文件中。

假如某个时刻,消息还未被ack,此时触发写idx文件的话,该消息的publish,delivery对应的二进制数据会被写入idx文件中;下一时刻,如果对该消息进行了ack,再次写idx文件时,ack操作对应的二进制数据可能也会被追加写入对应的idx文件中。

那有没有完全不需要写idx文件的时候呢?或者说idx文件什么情况下会被删除呢?

segment数据结构中有个unack字段,表示该segment中未被ack的消息数。publish一条消息时,unack的值加1;delivery一条消息时,unack的值不变;ack一条消息时,unack的值减1。每当写idx文件时,会先判断unack的值,如果该值为0,意味着该segment中所有的消息都已经ack了,消息也没必要进行存储了,此时不但不会有任何的写文件动作,还会直接删除该文件(之前可能有写入)。

【读流程】

相比写流程,索引文件的读流程要简单很多。rabbit_queue_index模块对外提供的读接口是读取一个序号范围段内的所有消息。实际处理过程中,根据起始序号找到对应的segment,先从segment对应的idx文件中读取(读取整个文件的内容,并解析得到一份segment),然后与内存中segment的信息进行合并并更新,得到最终的消息集合,这样就完成了读操作。

【strace跟踪验证】

在阅读源码的过程中,也用strace分析了实际的读写情况,以下面两个场景举例

  • 发送一条消息到队列时的情况:
代码语言:javascript复制
# 打开journal.jif
[pid  1230] 09:08:03.361755 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/journal.jif", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  1230] 09:08:03.361870 <... open resumed> ) = 14 <0.000070>
[pid  1230] 09:08:03.361936 fstat(14,  <unfinished ...>
[pid  1230] 09:08:03.361997 <... fstat resumed> {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 <0.000040>
[pid  1230] 09:08:03.363213 lseek(14, 0, SEEK_END <unfinished ...>
[pid  1230] 09:08:03.363298 <... lseek resumed> ) = 0 <0.000069>
# 定时器超时(200毫秒) 将缓存中的数据写入文件中
[pid  1230] 09:08:03.568104 writev(14, [...], 2 <unfinished ...>
[pid  1230] 09:08:03.568235 <... writev resumed> ) = 1249 <0.000104>
# 队列进程进入hibernate, 写idx文件
[pid  1230] 09:08:05.166167 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  1230] 09:08:05.166329 <... open resumed> ) = 15 <0.000107>
[pid  1230] 09:08:05.166465 fstat(15,  <unfinished ...>
[pid  1230] 09:08:05.166567 <... fstat resumed> {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 <0.000076>
[pid  1230] 09:08:05.174817 lseek(15, 0, SEEK_END <unfinished ...>
[pid  1230] 09:08:05.174942 <... lseek resumed> ) = 0 <0.000104>
[pid  1230] 09:08:05.177466 writev(15, [...], 2 <unfinished ...>
[pid  1230] 09:08:05.177560 <... writev resumed> ) = 1243 <0.000078>
[pid  1230] 09:08:05.178717 fsync(15 <unfinished ...>
[pid  1230] 09:08:05.179367 <... fsync resumed> ) = 0 <0.000608>
[pid  1230] 09:08:05.180224 close(15 <unfinished ...>
[pid  1230] 09:08:05.180295 <... close resumed> ) = 0 <0.000049>
# idx文件写完后, 清理journal.jif文件
[pid  1230] 09:08:05.181555 lseek(14, 0, SEEK_SET <unfinished ...>
[pid  1230] 09:08:05.181644 <... lseek resumed> ) = 0 <0.000071>
[pid  1230] 09:08:05.182545 lseek(14, 0, SEEK_CUR <unfinished ...>
[pid  1230] 09:08:05.182632 <... lseek resumed> ) = 0 <0.000071>
[pid  1230] 09:08:05.182688 ftruncate(14, 0 <unfinished ...>
[pid  1230] 09:08:05.182794 <... ftruncate resumed> ) = 0 <0.000082>
  • 持续生产消费的情况(队列中几乎无堆积)
代码语言:javascript复制
# 写 0.idx
[pid  4486] 09:43:13.814236 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:13.814423 <... open resumed> ) = 73 <0.000129>
[pid  4486] 09:43:13.835035 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.835263 <... writev resumed> ) = 39776 <0.000199>
[pid  4486] 09:43:13.835929 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.836065 <... writev resumed> ) = 39776 <0.000108>
[pid  4486] 09:43:13.836716 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.836869 <... writev resumed> ) = 39776 <0.000127>
[pid  4486] 09:43:13.837491 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.837619 <... writev resumed> ) = 39776 <0.000099>
[pid  4486] 09:43:13.838277 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.838402 <... writev resumed> ) = 39776 <0.000097>
......
[pid  4486] 09:43:13.899874 writev(73, [...], 2 <unfinished ...>
[pid  4486] 09:43:13.899938 <... writev resumed> ) = 1243 <0.000047>
[pid  4486] 09:43:13.900538 fsync(73 <unfinished ...>
[pid  4486] 09:43:13.914344 <... fsync resumed> ) = 0 <0.013774>
[pid  4486] 09:43:13.915335 close(73 <unfinished ...>
[pid  4486] 09:43:13.915464 <... close resumed> ) = 0 <0.000111>
# 清空journal.jif
[pid  4486] 09:43:13.917176 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4486] 09:43:13.917358 ftruncate(68, 0 <unfinished ...>

# 写 0.idx 然后清空journal.jif(省略了中间writev)
[pid  4486] 09:43:15.358110 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:15.469725 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4486] 09:43:15.469913 ftruncate(68, 0 <unfinished ...>


# 写 0.idx 然后清空journal.jif(省略了中间writev)
[pid  4486] 09:43:17.671522 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:17.789003 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4486] 09:43:17.789243 ftruncate(68, 0 <unfinished ...>

# 写0.idx与1.idx 然后清空journal.jif(省略了中间writev)
[pid  4486] 09:43:20.321408 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/1.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:20.340966 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4483] 09:43:20.473029 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4483] 09:43:20.473272 ftruncate(68, 0 <unfinished ...>

# 写 1.idx
[pid  5082] 09:46:17.679119 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/1.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>

# 删除 0.idx
[pid  5082] 09:46:17.693002 stat("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx",  <unfinished ...>
[pid  5082] 09:46:17.693267 access("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", R_OK <unfinished ...>
[pid  5082] 09:46:17.693453 access("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", W_OK <unfinished ...>
[pid  5082] 09:46:17.694280 unlink("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx" <unfinished ...>

从strace抓到的系统调用来看,上述的分析流程与实际情况都是能对应起来的,同时也有些细节要注意,例如:

  • 在持续生产、消费时,journal.jif文件几乎不会有写入的动作,真正写入的只有idx文件。
  • 在每次dirty_count达到指定次数触发将segment中的数据写入idx文件时,可能会一次写多个idx文件。
  • 每次写idx文件的数据量是不确定的

【总结】

本文主要对索引文件的读写流程进行了说明,同时也通过strace进行了验证,了解这些原理后会有助于进行性能调优。

0 人点赞