一、拉取消息处理的结果情况
通过前面学习,我们知道rocketmq消费消息的过程中,会有一个拉取的动作,而这个拉取的动作中又会涉及到对拉取消息的处理。而这里又分为好几种情况。
代码语言:javascript复制ResponseCode.SUCCESS 拉取成功,响应成功的情况
ResponseCode.PULL_NOT_FOUND 拉取消息没找到
ResponseCode.PULL_RETRY_IMMEDIATELY 立即拉取回复
ResponseCode.PULL_OFFSET_MOVED 拉取位点被移除
拉取成功,我们很好理解,此时就是拉取成功了,进行正常响应。
二、拉取消息hold的情况
什么情况下会出现hold,然后wakeup呢?
一种情况是拉取处理的时候,没有找到拉取的消息,此时会做hold,另一种情况是broker启动的时候,同步broker成员组的时候会做hold操作。
本质是将请求放入到ManyPullRequest和pullRequestTable,然后取出,进行处理。
那么消息处理的过程中,如果当前没有消息可拉的时候,会怎么处理呢?
代码语言:javascript复制 case ResponseCode.PULL_NOT_FOUND:
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 将拉取请求进行hold操作
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
return null;
}
可以看到这个过程中主题和队列id通过@拼接起来作为key,然后通过key拿到拉取请求,如果没有的话,说明pullRequestTable没有,此时会将拉取请求放入到pullRequestTable中,然后设置成suspended为true的标识。最终将拉取请求添加到ManyPullRequest中。
三、ManyPullRequest的hold检查处理
通过运行的标识,可以看到,如果开启了长轮询的标识的话,就会等待5000毫秒钟之后会执行后面的hold检查操作,否则等待执行短轮询操作,执行hold检查。
代码语言:javascript复制 public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检查是否hold请求了
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.warn("PullRequestHoldService: check hold pull request cost {}ms", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
hold检查的过程实质就是wakeup拉取处理器执行拉取消息的过程。可以看到会根据
代码语言:javascript复制ManyPullRequest mpr = this.pullRequestTable.get(key)
拿到请求,从而wakeup:
代码语言:javascript复制 this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
从而实现拉取从而处理数据。
除此之外,我们可以wakeup出现的情况,还有一种情况是onMinBrokerChange,此时会通过broker上线的情况notifyMasterOnline,进行消息拉取。而本质是在broker启动的时候,会启动定时任务,同步broker成员组信息。
代码语言:javascript复制 BrokerController.this.syncBrokerMemberGroup();
可以看到1秒一次。
参考:https://rocketmq.apache.org/ rocketmq官网
https://github.com/apache/rocketmq