云原生中间件RocketMQ-消费者消费模式之广播模式、偏移量offset解析

2022-11-28 15:38:46 浏览数 (2)

PushConsumer消费模式-广播模式

广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。

相比于集群模式,广播模式的特点为: 每个消费者都会消费所订阅的Topic Tag下的所有queue中的所有消息。 适用场景&注意事项

  1. 广播消费模式下不支持顺序消息。
  2. 广播消费模式下不支持重置消费位点。
  3. 每条消息都需要被相同逻辑的多台机器处理。
  4. 广播模式下, 消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次, 但是并不会对消费失败的消息进行失败重投, 因此业务方需要关注消费失败的情况。
  5. 广播模式下, 客户端每一次重启都会从最新消息消费。 客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择
  6. 广播模式下, 每条消息都会被大量的客户端重复处理, 因此推荐尽可能使用集群模式。
  7. 目前仅 Java 客户端支持广播模式。
  8. 广播模式下服务端不维护消费进度, 所以消息队列 RocketMQ 控制台不支持消息堆积查询、 消息堆积报警和订阅关系查询功能。
  9. 消费进度在客户端维护, 出现消息重复消费的概率稍大于集群模式。

设置成广播模式相关代码如下:

代码语言:javascript复制
//设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

至少一次设计理念

在集群模式下,RocketMQ 可以保证Topic Tag下的消息可以肯定会被整个集群至少消费一次。 在广播模式下,RocketMQ 可以保证至少被每台机器消费一次。 类似于数据库的事务操作,消费者未消费完成不返回ack给RocketMQ。官方对于至少一次的解释如下:

官方地址:https://github.com/apache/rocketmq/blob/master/docs/cn/features.md

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过) msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消息存储核心-偏移量Offset

Offset指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行定位到这条消息。Offset是消息消费进度的核心。

  • message queue 是无限长的数组,一条消息进来下标就会加1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
  • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset 1,minOffset 则是现存在的最小 offset。
  • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费。

Offset的存储实现分为远程文件类型和本地文件类型两种方式。

集群模式-RemoteBrokerOffsetStore解析

默认集群模式clustering,采用远程文件存储Offset。 本质上因为多消费模式,每个Consumer消费所订阅主题的一部分 这种情况需要broker控制offset的值,使用RemoteBrokerOffsetStore。

广播模式-LocalFileOffsetStore解析

  1. 广播模式下,由于每个Consumer都会收到消息且消费
  2. 各个Consumer之间没有任何干扰,独立线程消费
  3. 所以使用LocalFileOffsetStore,也就是把Offset存储到本地

RocketMQ消费者拉取模式-PullConsumer使用

Pull方式主要做了三件事:

  • 获取Message Queue并遍历
  • 维护OffsetStore
  • 根据不同的消息状态做不同的处理

代码案例如下: DefaultMQPullConsumer拉取:

代码语言:javascript复制
package com.zjq.rocketmq.consumer.pull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import com.zjq.rocketmq.constants.Const;
 

public class PullConsumer {
	//Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
    //实际中可以放在redis里面或者持久化记录消费的位置
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
    	
    	String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //	从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //	遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: "   mq);
            
            SINGLE_MQ: while (true) {
                try {
                	//	从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
	                    case FOUND:
	                    	List<MessageExt> list = pullResult.getMsgFoundList();
	                    	for(MessageExt msg : list){
	                    		System.out.println(new String(msg.getBody()));
	                    	}
	                        break;
	                    case NO_MATCHED_MSG:
	                        break;
	                    case NO_NEW_MSG:
	                    	System.out.println("没有新的数据啦...");
	                        break SINGLE_MQ;
	                    case OFFSET_ILLEGAL:
	                        break;
	                    default:
	                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
 
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
 
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
 
}

MQPullConsumerScheduleService定时拉取:

代码语言:javascript复制
package com.zjq.rocketmq.consumer.pull;

import java.util.List;

import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.zjq.rocketmq.constants.Const;
 
public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {
    	
    	String group_name = "test_pull_consumer_name";
    	
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
        
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        
        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        
        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
 
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: "   mq.getQueueId()    "-------------");
                try {
                    // 获取从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;
 
                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                    	List<MessageExt> list = pullResult.getMsgFoundList();
                    	for(MessageExt msg : list){
                    		//消费数据...
                    		System.out.println(new String(msg.getBody()));
                    	}
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);
          
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
 
        scheduleService.start();
    }
}

参考: https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md

本文内容到此结束了, 如有收获欢迎点赞

0 人点赞