TimerMessageStore 简略介绍
- 延迟队列
rmq_sys_wheel_timer
- 指定时间的延迟消息。会先投递到
rmq_sys_wheel_timer
队列中 - 然后由
TimerMessageStore
消费队列数据,将数据消费到timerWheel
使用时间轮算法,实现秒级任务
TimerMessageStore 操作的文件
storeconsumequeuermq_sys_wheel_timer
从队列中读取消息, 提取数据存到timerlog
与timerwheel
中storecheckpoint
对应TimerMessageStore#timerCheckpoint
lastReadTimeMs
上次消费的时间节点lastTimerLogFlushPos
最后刷新 log的 poslastTimerQueueOffset
最后一次消费的队列节点masterTimerQueueOffset
主 Broker 的队列消费节点
storetimerwheel
时间轮,内由Slot
组成 结构如下timeMs
消息到达时间firstPos
开始的 poslastPos
结束的 pos 在 timerLog 中读取数据, 后面会讲具体逻辑num
消息数量magic
no use now, just keep it
storetimerlog
对应TimerMessageStore#timerCheckpoint
里边也是由多个mappedFile
组成。 主要是存储原msg的数据, 因为从rmq_sys_wheel_timer
消费了之后, 会存到timerwheel
与timerlog
中
TimerMessageStore 启动
- enqueueGetService.start();
- enqueuePutService.start();
- dequeueWarmService.start();
- dequeueGetService.start();
- timerFlushService.start();
- dequeueGetMessageServices[getThreadNum].start();
- dequeuePutMessageServices[getThreadNum].start();
深入 TimerMessageStore 之 TimerEnqueueGetService
TimerMessageStore.this.enqueue
默认 100毫秒执行一次- 从 消息队列
rmq_sys_wheel_timer
消费数据 ps:currQueueOffset
从checkpoint
读取出来的 - 将消费出来的数据, 封装成 TimerRequest 投入到
enqueuePutQueue
中 currQueueOffset 1
进入下一个循环 消费下一个 offset 节点
深入 TimerMessageStore 之 TimerEnqueuePutService
- 消费
enqueuePutQueue
中的数据 shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs
检查消费的消息是否已到达投递时间。- 到达时间。投递到
dequeuePutQueue.put(req);
中 - 消息未到达时间
doEnqueue
->timerWheel.getSlot(delayedTime)
获取延迟时间插槽。- 构建
ByteBuffer
投入timerLog
中数据结构为: - |消息大小|前一个节点的pos|magic|log写入时间|延迟时间|offsetPy|sizePy|realTopic|0
timerLog.append
返回插入位置 ret- 构建
timerWheel
|消息到达时间戳|firstPos|ret (timerLog.append返回位置)| 消息数量| 0|
- 到达时间。投递到
深入 TimerMessageStore 之 TimerDequeueGetService
- 消费
timerWheel
中的数据 - 根据
currReadTimeMs
来获取timerWheel
插槽数据currReadTimeMs
初始化的时候timerCheckpoint.getLastReadTimeMs()
读取的是上次最后消费的数据- 假设broker 宕机了一段时间。那么
currReadTimeMs
会按照上一次宕机的时间开始搜寻数据, 这样子宕机消息也不会丢失。会在启动的那段时间被投递出去 currReadTimeMs
在moveReadTime
方法中会自增
timerWheel.getSlot(currReadTimeMs);
读取插槽数据long currOffsetPy = slot.lastPos;
读取插槽属性, 最后一个pos节点timerLog.getWholeBuffer(currOffsetPy)
根据currOffsetPy
获取SelectMappedBufferResult
- 从
timerLog
的SelectMappedBufferResult
中获取数据。prevPos
上一个节点数据enqueueTime
放入 timerLog 的时间delayedTime
消息到达时间戳offsetPy
commitLog的数据位置sizePy
commitLog的数据大小
- 构建
TimerRequest
讲消息投递到dequeueGetQueue
中 currOffsetPy = prevPos
将位置移动到前一个,进行遍历
深入 TimerMessageStore 之 TimerDequeueGetMessageService
- 默认有三个
TimerDequeueGetMessageService
实例同时消费dequeueGetQueue
getMessageByCommitOffset
从commitLog
中读取原投递的消息数据- 读取
uniqkey
判断不在deleteList
中的时候 将消息投递到dequeuePutQueue
中去
深入 TimerMessageStore 之 TimerDequeuePutMessageService
- 默认有三个
TimerDequeuePutMessageService
实例同时消费dequeuePutQueue
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
将消息转换成原始的 topic 消息,清除无用属性doPut
->messageStore.putMessage(message)
将消息投递到指定messageQueue
中
TimerFlushService
timerLog
刷盘timerWheel
刷盘timerCheckpoint
刷盘
TimerMessageStore 初始化加载源码
timerLog.load()
加载文件timerMetrics.load
加载文件recover
->recoverAndRevise(lastFlushPos, true)
ps: (用于timerWhel
跟timerLog
的数据保持一致刷新)lastFlushPos
最后一次刷盘的位置, 其实最终是拿到timerlog -> mappedFile
的第几个文件- 遍历这个
mappedFile
的数据 timerWheel.reviseSlot
修改插槽数据。 检查这个时间的插槽是否已经有填充数据。- 如果有的话,刷新
lastPos
(顺序遍历。这里最终还是会是最后一个 lastPos) - 如果不存在插槽数据 则插入插槽数据
putSlot
- 如果有的话,刷新
reviseQueueOffset(processOffset);
读取timerLog
最后一个数据, 为了校验最后一个数据是否正常,是否能读取到消息。- 确认
currQueueOffset
数据 - 确认
currReadTimeMs
数据