RocketMQ 消费进度持久化

2024-09-20 11:50:31 浏览数 (1)

消费进度文件内容

消息消费完毕,如何保持消费进度呢?带着这个疑问,来看下 RocketMQ 的实现。

RocketMQ 保存消费偏移的文件位置在 ${user.home}/store/config 目录下。

consumerOffset.json

保存正常消费的消费进度,来看下,文件里面的内容。

代码语言:javascript复制
json 代码解读复制代码{
    "offsetTable":{

        "%RETRY%TestConsumer@TestConsumer":{0:0},
        
        "TopicTest@TestConsumer":{0:1,1:2,2:1,3:0}
    }
}

${Topic}@${ConsumerGroup} 格式是正常的消费者。

%Retry%${Topic}@${ConsumerGroup} 格式是重试主题。

{1:2} 则表示 1 号队列已经消费完 2 条消息。

在讲解 ConsumerQueue 时,我提到过它的子条目是定长的,所以在存储消费偏移时,可以存储消费到第几个子条目。

delayOffset.json

保存延迟消息的消费进度,文件的内容如下

代码语言:javascript复制
json 代码解读复制代码{
"offsetTable":{1:14,3:10,4:10,5:10,6:10,7:10,8:10,9:0}
}

{4:10} 表示:延迟等级为 4 的消费完了 10 个。

讲解完,消费偏移在文件上怎么存储的,我们看下 RocketMQ 在什么时候会消费进度持久化。

消费进度持久化代码实现

正常的普通消息

Broker 每隔 5s 持久化消费偏移。

代码位置:

BrokerController#initialize()

代码语言:javascript复制
java 代码解读复制代码public boolean initialize() throws CloneNotSupportedException {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
            }
        }
    }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval()/* 默认5000 */, TimeUnit.MILLISECONDS);
}

延迟消息

Broker 每隔 10s 就会将 延迟消息消费偏移 持久化。

代码位置:ScheduleMessageService#start()

代码语言:javascript复制
java 代码解读复制代码public class ScheduleMessageService extends ConfigManager {
    public void start() {
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            // 获取配置的延迟等级
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }

                // 初始化延迟调度任务
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }

            // 默认每隔 10s,执行一次持久化
            this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    try {
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()/*默认 10000 */);
        }
    }
}

0 人点赞