RocketMQ生产消费指南

2022-10-27 17:14:34 浏览数 (1)

RocketMQ是一款可靠性非常强的一款消息中间件,概念相比如RabbitMQ来讲也相对简单,只有一个生产消费的概念并不涉及多种消费订阅模式.

描述流程之前先简单介绍下生产者组和消费者组这两种概念

生产者组

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

消费者组

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

生产消费流程

创建一条标记着topic,tag和body的消息,消息的发送时间可以选填,这是RocketMQ实现延时消息的基础,key当然也是选填,不过我建议你为其赋予业务标识的值,因为谈到消息队列,如何避免重复消费就是一个不可躲避的话题,很遗憾RocketMQ无法保证消息不被重复消费,但是我们可以根据Message上的key在我们的业务上实现幂等性,消息可以重复,但是我们可以根据业务ID判断这条消息有没有消费的必要. 生产者携带着这条消息到broker,当然消息到了Broker之后,生产者也会根据不同的发送方式作出不同的应对.单向发送的话生产者直接把消息发送出去之后就完成了,同步消息生产者会一直等到broker的反馈,异步发送生产者发消息发送出去之后会立刻返回使用回调的方式确认broker的接收状态. 消息到达broker之后,消费者可以选择pull的方式主动拉取消息或是选择push的方式broker接收到消息之后主动推送给消费者,两种方式没有绝对的好坏,根据自己的业务场景选择就好.顺便说一下集群消费和广播消费吧:

  1. 集群订阅:一个topic9条消息 消费者组中有3个消费者 那么每个消费者平均消费3条消息
  2. 广播订阅: 一个topic9条消息 消费者组中有3个消费者 那么每个消费者都会消费9条消息

SpringBoot使用push的方式收发事务消息

先看下RocketMQ收发事务消息的流程吧.

生产消息

创建事务检查器

代码语言:javascript复制

@Component
public class DemoLocalTransactionChecker implements LocalTransactionChecker {
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("开始回查本地事务状态");
        return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
    }
}

创建生产者

代码语言:javascript复制
@Configuration
public class TransactionProducerClient {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private DemoLocalTransactionChecker localTransactionChecker;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public TransactionProducerBean buildTransactionProducer() {
        TransactionProducerBean producer = new TransactionProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        producer.setLocalTransactionChecker(localTransactionChecker);
        return producer;
    }

}

发送消息

代码语言:javascript复制
public void txMessage() {
    Message message = new Message("oak-test-1","tag1","hello rocketmq! Transaction Message~".getBytes(StandardCharsets.UTF_8));
    try {
        SendResult sendResult = buildTransactionProducer.send(message, (msg, arg) -> {
            // 消息ID(有可能消息体一样,但消息ID不一样,当前消息属于半事务消息,所以消息ID在消息队列RocketMQ版控制台无法查询)。
            String msgId = msg.getMsgID();
            // 消息体内容进行crc32,也可以使用其它的如MD5。
            long crc32Id = 666L;
            // 消息ID和crc32id主要是用来防止消息重复。
            // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。
            // 如果要求消息绝对不重复,推荐做法是对消息体使用crc32或MD5来防止重复消息。

            TransactionStatus transactionStatus = TransactionStatus.Unknow;
            try {
                boolean isCommit = true; // 这里的true由业务代码返回

                if (isCommit) {
                    // 本地事务已成功则提交消息。
                    transactionStatus = TransactionStatus.CommitTransaction;
                } else {
                    // 本地事务已失败则回滚消息。
                    transactionStatus = TransactionStatus.RollbackTransaction;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(msg.getMsgID());
            return transactionStatus;

        }, null);
    } catch (Exception e) {
        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
        System.out.println(new Date()   " Send mq message failed. Topic is:"   message.getTopic());
        e.printStackTrace();
    }
}

消费消息

自定义消息监听器

代码语言:javascript复制
@Component
public class DemoMessageListener implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {

        System.out.println("Receive: "   message);
        try {
            System.out.print(new String(message.getBody(), StandardCharsets.UTF_8));
            return Action.CommitMessage;
        } catch (Exception e) {
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

创建消费者

代码语言:javascript复制
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
    ConsumerBean consumerBean = new ConsumerBean();
    //配置文件
    Properties properties = mqConfig.getMqPropertie();
    properties.setProperty(PropertyKeyConst.GROUP_ID,mqConfig.getGroupId());
    //将消费者线程数固定为20个 20为默认值
    properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
    // 订阅方式有两种集群订阅和广播订阅
    // 集群订阅:一个topic9条消息 消费者组中有3个消费者 那么每个消费者平均消费3条消息
    // 广播订阅: 一个topic9条消息 消费者组中有3个消费者 那么每个消费者都会消费9条消息
    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)。 订单这种肯定选择集群订阅啦
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
    consumerBean.setProperties(properties);
    //订阅关系
    Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
    Subscription subscription = new Subscription();
    subscription.setTopic(mqConfig.getTopic());
    subscription.setExpression(mqConfig.getTag());
    subscriptionTable.put(subscription, messageListener);
    //订阅多个topic如上面设置

    consumerBean.setSubscriptionTable(subscriptionTable);
    return consumerBean;
}

0 人点赞