RocketMQ消费处理hold过程学习

2023-12-25 11:05:15 浏览数 (2)

一、拉取消息处理的结果情况

通过前面学习,我们知道rocketmq消费消息的过程中,会有一个拉取的动作,而这个拉取的动作中又会涉及到对拉取消息的处理。而这里又分为好几种情况。

代码语言:javascript复制
ResponseCode.SUCCESS 拉取成功,响应成功的情况
ResponseCode.PULL_NOT_FOUND  拉取消息没找到
ResponseCode.PULL_RETRY_IMMEDIATELY 立即拉取回复
ResponseCode.PULL_OFFSET_MOVED  拉取位点被移除

拉取成功,我们很好理解,此时就是拉取成功了,进行正常响应。

二、拉取消息hold的情况

什么情况下会出现hold,然后wakeup呢?

一种情况是拉取处理的时候,没有找到拉取的消息,此时会做hold,另一种情况是broker启动的时候,同步broker成员组的时候会做hold操作。

本质是将请求放入到ManyPullRequest和pullRequestTable,然后取出,进行处理。

那么消息处理的过程中,如果当前没有消息可拉的时候,会怎么处理呢?

代码语言:javascript复制
   case ResponseCode.PULL_NOT_FOUND:
                final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
                final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;

                if (brokerAllowSuspend && hasSuspendFlag) {
                    long pollingTimeMills = suspendTimeoutMillisLong;
                    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                    }

                    String topic = requestHeader.getTopic();
                    long offset = requestHeader.getQueueOffset();
                    int queueId = requestHeader.getQueueId();
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                    // 将拉取请求进行hold操作
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    return null;
                }

可以看到这个过程中主题和队列id通过@拼接起来作为key,然后通过key拿到拉取请求,如果没有的话,说明pullRequestTable没有,此时会将拉取请求放入到pullRequestTable中,然后设置成suspended为true的标识。最终将拉取请求添加到ManyPullRequest中。

三、ManyPullRequest的hold检查处理

通过运行的标识,可以看到,如果开启了长轮询的标识的话,就会等待5000毫秒钟之后会执行后面的hold检查操作,否则等待执行短轮询操作,执行hold检查。

代码语言:javascript复制
  public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                // 检查是否hold请求了
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.warn("PullRequestHoldService: check hold pull request cost {}ms", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName()   " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }

hold检查的过程实质就是wakeup拉取处理器执行拉取消息的过程。可以看到会根据

代码语言:javascript复制
ManyPullRequest mpr = this.pullRequestTable.get(key)

拿到请求,从而wakeup:

代码语言:javascript复制
 this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());

从而实现拉取从而处理数据。

除此之外,我们可以wakeup出现的情况,还有一种情况是onMinBrokerChange,此时会通过broker上线的情况notifyMasterOnline,进行消息拉取。而本质是在broker启动的时候,会启动定时任务,同步broker成员组信息。

代码语言:javascript复制
 BrokerController.this.syncBrokerMemberGroup();

可以看到1秒一次。

参考:https://rocketmq.apache.org/ rocketmq官网

https://github.com/apache/rocketmq

0 人点赞