面试系列之-rocketmq长轮询模式

2022-12-29 20:05:56 浏览数 (2)

Consumer消费两种模式

pull模式
代码语言:javascript复制
public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
         MessageQueue mq = new MessageQueue();
         mq.setQueueId(0);
         mq.setTopic("mq-test");
         mq.setBrokerName("broker-a");
        long offset = 26;
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
        System.out.printf("%s%n", pullResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
    consumer.shutdown();
}

Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数;

  • 好处:是如果Broker消息特别多的话,消费端按照自身的消费能力匀速消费消息,不至于被大量消息打死;
  • 缺陷:消息超时时间可以配置,设置短则会轮训频率过快服务端会承担压力,甚至导致空转,设置长则导致消息接收不及时;
push模式
代码语言:javascript复制
public static void main(String[] args) throws InterruptedException,    MQClientException {// 构造方法
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("mq-test", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,        ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n",        Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费;

  • 好处:可以及时收到新的消息,消费端不会产生额外的延迟;
  • 缺陷:当有大量的推送消息会加重消费端的负载甚至将消费端打死,同时Broker会维护所有建连的客户端连接;

RocketMQ实现长轮询

长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据,consumer收到响应后根据状态判断是否有消息;

Consumer端处理
Consumer启动
代码语言:javascript复制
MQClientInstance#start
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 前后省略
                this.pullMessageService.start();
                     break;
            case START_FAILED:
throw new MQClientException("The Factory object["              this.getClientId()   "] has been created before,            and failed.", null);default:
                break;
        }
    }
}

Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下;

Consumer请求
代码语言:javascript复制
PullMessageService# 客户端发起拉取消息请求
public void run() {
    while (!this.isStopped()) {
try {      // 将返回结果添加到QueuePullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);
        }
}

private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.    selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } 
}
DefaultMQPushConsumerImpl#pullMessage
try {
    // 真正拉取消息的地方,首先获取Broker信息
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),     subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(), pullRequest.getNextOffset(),     this.defaultMQPushConsumer.getPullBatchSize(),sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS    , CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC, pullCallback);
}

启动后Consumer则不断轮询 Broker 获取消息。Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息;

Consumer处理的逻辑包括:

  • 判断 Consumer 处于运行中状态、Consumer 处于暂停中;
  • 消息处理队列持有消息最大量和消息体最大量;
  • 根据 consumeOrderly 判断是否为顺序消息;
  • 根据topic获取订阅组信息;
  • 真正拉取消息,发起netty请求。请求参数包含 messageQueue(可以认为标识当前客户端该topic具有唯一性)、当前 Consumer 最大偏移量、每次拉取数量、拉取方式(同步||异步)、回调函数PullCallback。还有netty连接的超时时长 timeoutMillis = 30s 和 broker端hold时长 brokerSuspendMaxTimeMillis =15s;
Consumer响应处理
代码语言:javascript复制
DefaultMQPushConsumerImpl#pullMessage
 // 当拉取的请求有响应时
PullCallback pullCallback = new PullCallback() {
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper
			.processPullResult(pullRequest.getMessageQueue(),            pullResult			,subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    // 统计消费组下消息主题拉取耗时
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager()
					.incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null 
					|| pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this
				.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0)
						.getQueueOffset();
                        // 提交拉取到的消息到消息处理队列
boolean dispatchToConsume = processQueue.                putMessage(pullResult.getMsgFoundList());// 提交消费请求  ConsumeRequest#run 拉取消息响应listener
		//.consumeMessage最终返回给客户端,同时也包括执行前和执行后逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService
						.submitConsumeRequest(pullResult.                        getMsgFoundList(),						processQueue, pullRequest                      .getMessageQueue(), dispatchToConsume);
                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer
						.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(
							pullRequest, DefaultMQPushConsumerImpl.this
							.defaultMQPushConsumer.getPullInterval());
                        } else {
                            // 消费者拉取完消息后,立马就有开始下一个拉取任务
                            DefaultMQPushConsumerImpl
							.this.executePullRequestImmediately(pullRequest);
                        }
                    }
                    if (pullResult.getNextBeginOffset() < prevRequestOffset 
					|| firstMsgOffset < prevRequestOffset) {
                    }
                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                    //消费者没有消息,立马就有开始下一个拉取任务
                    DefaultMQPushConsumerImpl.
					this.executePullRequestImmediately(pullRequest);
                    break;
                default:
                    break;
            }
        }
    }

PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求:

  • FOUND:有消息则进行处理结果和统计、更新最新的偏移量(本地或者远程),完成后将请求添加到pullRequestQueue队列里继续轮训;
  • NO_NEW_MSG:拉取请求没有新消息但超过hold时长返回, 会进行下一轮消息拉取请求;
  • NO_MATCHED_MSG:有新消息但是没有匹配;
  • OFFSET_ILLEGAL:拉取的消息队列位置不合法,需要更新消费进度再进行下一轮消息拉取;
Broker收到Consumer请求
Broker没有收到消息如何hold请求
代码语言:javascript复制
Consumer发起拉取消息请求,Broker端无消息
Broker端
PullMessageProcessor#processRequest
// broker端没有拉取到消息
case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig()
				.getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, 
			pollingTimeMills,this.brokerController.getMessageStore().now(), 
			 offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService()
			.suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }
    
// 先将拉取请求放在this.pullRequestTable中,进行挂载起来
public void suspendPullRequest(final String topic, final int queueId, 
							   final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}

如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving-->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。 hold的请求存放在 ConcurrentHashMap<String, ManyPullRequest> 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List<PullRequest> 可以理解对应的多个相同的topic客户端;

hold请求超时处理
代码语言:javascript复制
PullRequestHoldService  轮训遍历是否阻塞请求快到超时时间,进行唤醒    
public void run() {
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) { }
        }  
    }
}
// 
private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                }  
            }
        }
}

Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测;

服务端收到Producer消息
代码语言:javascript复制
DefaultMessageStore#doReput
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset()   1,
        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
NotifyMessageArrivingListener#arriving
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
}

PullRequestHoldService
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();  // 克隆挂起的请求列表
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));                       
                       // 从挂起的请求列表中找到当前新的消息的匹配的,匹配到了则唤起请求立即给客户端返回。
                       if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                            }  
                            continue;
                        }
                     }
                    // 如果列表中挂起的请求快超时了则立即唤醒返回给客户端
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp()   request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                        }  
                        continue;
                    }
                    replayList.add(request);
                }
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
}

Producer写入消息,Broker端有消息通知Consumer端; 当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest;

总结

当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户; 如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE;

0 人点赞