消息消费不丢失
手动ACK
在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
spring boot中 消费消息确认
代码语言:javascript复制
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",
topic = "${carInInfo.topic}", selectorExpression = "*",
consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {
/**
* 请不要捕获异常信息,否则无法进行消息重新推送
*
* @param addCarInParkDTO
*/
@Override
public void onMessage(AddCarInParkDTO addCarInParkDTO) {
System.out.println("收到消息:" JSON.toJSONString(addCarInParkDTO));
}
指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:
DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现
代码语言:javascript复制public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了
代码语言:javascript复制 private void initRocketMQPushConsumer() throws MQClientException {
......
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
}
消息重试
对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。
如何让消息进行重试
RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
代码语言:javascript复制@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String message) {
System.out.println("Received message : " message);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 设置最大重试次数
consumer.setMaxReconsumeTimes(5);
}
}
一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
每次重试的间隔时间如下:
死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
消息幂等
在MQ系统中,消息幂等有三种实现语义:
at most once 最多一次:每条消息最多只会被消费一次
at least once 至少一次:每条消息至少会被消费一次
exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。
RocketMQ 消息重复的场景
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复消息
消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
消息幂等解决方案
在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。
代码语言:javascript复制insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
要实现消息的幂等,我们可能会采取这样的方案:
代码语言:javascript复制select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。
还可以通过以下方式处理:
- 使用数据库的行锁处理
- 利用分布式锁处理不同服务间的并发。
- 数据库对唯一值的入库字段设唯一索引。