大家好,我是君哥。
RocketMQ 消息消费有两种模式,PULL 和 PUSH,今天我们来看一下这两种模式有什么区别。
PUSH 模式
首先看一段 RocketMQ 推模式的一个官方示例:
代码语言:javascript复制public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
消费者会定义一个消息监听器,并且把这个监听器注册到 DefaultMQPushConsumer,同时也会注册到 DefaultMQPushConsumerIm-pl,当拉取到消息时,就会使用这个监听器来处理消息。那这个监听器是什么时候调用呢?看下面的 UML 类图:
消费者真正拉取请求的类是 DefaultMQPush-ConsumerImpl,这个类的 pullMessage 方法调用了 PullAPIWrapper 的 pullKernelImpl 方法,这个方法有一个参数是回调函数 Pull-Callback,当 PULL 状态是 PullStatus.FOU-ND 时,代表拉取消息成功,处理逻辑如下:
代码语言:javascript复制PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//省略部分逻辑
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//省略部分逻辑
break;
//省略其他case
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
//省略
}
};
这个处理逻辑调用了 ConsumeMessage-Service 类的 submitConsumeRequest 方法,我们看一下并发消费消息的处理逻辑,代码如下:
代码语言:javascript复制public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//分批处理,跟上面逻辑一致
}
ConsumeRequest 类是一个线程类,run 方法里面调用了消费者定义的消息处理方法,代码如下:
代码语言:javascript复制public void run() {
//省略逻辑
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
//省略逻辑
try {
//调用消费方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//省略逻辑
}
//省略逻辑
}
下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:
- 在 MessageListenerConcurrently 中定义消费者处理逻辑,消费者启动时注册到 DefaultMQPushConsumer 和 DefaultMQ-PushConsumerImpl;
- 消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法;
- DefaultMQPushConsumerImpl 类的 pullMessage 方法调用 PullAPIWrapper 的 pullKernelImpl 方法真正去发送 PULL 请求,并传入 PullCallback 的 回调函数;
- 拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息;
- ConsumeRequest 处理消息时调用了消费端定义的消费逻辑,也就是 Message-ListenerConcurrently 的 consumeMessage 方法。
PULL 模式
下面是来自官方的一段 PULL 模式拉取消息的代码:
代码语言:javascript复制DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
这里我们看到,PULL 模式需要在处理逻辑里不停的去拉取消息,比如上面代码中写了一个死循环。那 PULL 模式中 poll 函数是怎么实现的呢?我们看下面的 UML 类图:
跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?
官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这里相关的 UML 类图如下:
这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe,代码如下:
代码语言:javascript复制public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
//省略逻辑
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
//省略逻辑
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,代码如下:
代码语言:javascript复制public void run() {
//省略部分逻辑
if (!this.isCancelled()) {
long pullDelayTimeMills = 0;
try {
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
//省略其他 case
}
}
//省略 catch
if (!this.isCancelled()) {
//启动下一次拉取
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
}
}
拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。
这样就清除了示例代码中 poll 消息的逻辑,那还有一个问题,监听器是什么时候触发监听事件呢?
在消费者启动时,会启动 RebalanceService 这个线程,这个线程的 run 方法如下:
代码语言:javascript复制public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
下面的 UML 类图显示了 doRebalance 方法的调用关系:
可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:
代码语言:javascript复制public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
这里最终触发了监听器。
下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:
- 消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器;
- 消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程;
- 消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取;
- 消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。
总结
通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:
- PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑;
- PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。
·············· END ··············