Rocketmq消费消息时不丢失不重复

2023-11-20 20:14:00 浏览数 (2)

消息消费不丢失

手动ACK

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

spring boot中 消费消息确认

代码语言:javascript复制
​
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",
        topic = "${carInInfo.topic}", selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {
   
    /**
     * 请不要捕获异常信息,否则无法进行消息重新推送
     *
     * @param addCarInParkDTO
     */
    @Override
    public void onMessage(AddCarInParkDTO addCarInParkDTO) {
        System.out.println("收到消息:"   JSON.toJSONString(addCarInParkDTO));
    }
​

指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:

DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现

代码语言:javascript复制
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
 @SuppressWarnings("unchecked")
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt messageExt : msgs) {
            log.debug("received msg: {}", messageExt);
            try {
                long now = System.currentTimeMillis();
                handleMessage(messageExt);
                long costTime = System.currentTimeMillis() - now;
                log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
            } catch (Exception e) {
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
 
        return ConsumeOrderlyStatus.SUCCESS;
    }

只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了

代码语言:javascript复制
 private void initRocketMQPushConsumer() throws MQClientException {
       ......
 
        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }
 
        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        }
 
    }

消息重试

对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试

RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

代码语言:javascript复制
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {
​
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "  message);
​
    }
​
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
      // 设置最大重试次数
      consumer.setMaxReconsumeTimes(5);
​
    }
}

一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

每次重试的间隔时间如下:

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

消息幂等

在MQ系统中,消息幂等有三种实现语义:

at most once 最多一次:每条消息最多只会被消费一次

at least once 至少一次:每条消息至少会被消费一次

exactly once 刚刚好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。

at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。

at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。

而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。

RocketMQ 消息重复的场景

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复消息

消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

消息幂等解决方案

在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

代码语言:javascript复制
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

代码语言:javascript复制
select * from t_order where order_no = 'order123'
if(order  != null) {
    return ;//消息重复,直接返回
}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

还可以通过以下方式处理:

  • 使用数据库的行锁处理
  • 利用分布式锁处理不同服务间的并发。
  • 数据库对唯一值的入库字段设唯一索引。

0 人点赞