大家好,我是小菜。一个希望能够成为 吹着牛X谈架构 的男人!如果你也想成为我想成为的人,不然点个关注做个伴,让小菜不再孤单!
本文主要介绍
RabbitMQ的消息丢失问题
如有需要,可以参考 如有帮助,不忘 点赞 ❥ 微信公众号已开启,小菜良记,没关注的同学们记得关注哦!
是的,最终是对 RabbitMQ 下手了!
面试中常见的RabbitMQ面试题也是多了去了,常见的如下:
- 消息可靠性问题:如何确保发送的消息至少被消费一次?
- 延迟消息问题:如何实现消息的延迟投递?
- 高可用问题:如何避免单点的MQ故障而导致的不可用问题?
- 消息堆积问题:如何解决数百万级以上消息堆积,无法及时消费问题?
这几个问题又得让你脑壳疼一阵子,是不是也在网上看了挺多博文介绍这方面的解决方案,但是却看了又忘,实际便是因为缺少实操,这篇小菜便重点讲述下 RabbitMQ 如何解决消息丢失问题~
一、消息可靠性问题
消息可靠性问题我们又可能将其理解为如何防止消息丢失?那为什么消息会丢失呢?我们可以先看看消息投递的整个过程:
我们从图中可以从三个阶段分析可能造成消息丢失:
- publisher 发送消息到 exchange
- exchange 分发到 queue
- queue 投递到 customer
既然我们知道了哪些阶段可能造成数据丢失,那我们就可以从源头防范于未然~!
工程结构
工程结构很简单,就是一个简单的 Spring Boot 项目,里面有个 消费者
和 生产者
两个模块
1、生产者发送丢失
RabbitMQ 中提供了 publisher confirm
机制来避免消息发送到 MQ 的过程中丢失的问题。消息发送到 MQ 以后,会返回一个确认结果给生产者,用于表示消息是否确认成功。该确认结果存在两种请求:
publisher-confirm
该类型是 发送者确认 ,存在两种情况
- 消息成功投递到交换机,返回
ack
- 消息未投递到交换机,返回
nack
publisher-return
该类型是 发送者回执 ,存在两种情况
- 消息投递到交换机,且成功分发到队列,返回
ack
- 消息投递到交换机,但未成功分发到队列,返回
nack
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一ID,以区分不同消息,避免ack冲突
接下来我们用代码来说明具体的操作方式
1)配置文件
我们首先看下 生产者
的配置文件
前面几个配置 RabbitMQ 的连接信息没啥好讲的,我们来看几个比较陌生的配置
publisher-confirm-type
开启发送确认,这里可以支持两种类型
- simple:同步等待 confirm 结果,直到超时
- correlated:异步回调,定义 ConfirmCallback,MQ返回结果时会回调这个 ConfirmCallback
publisher-returns
开启 public-return,同样是基于 CallBack 机制,不过是定义 ReturnCallback
template.mandatory
定义路由失败时的策略。
- true:调用 ReturnCallback
- false:直接丢弃消息
2)定义回调事件
每个 RabbitTemplate 只能配置一个 ReturnCallback
image-20211024171022583
3)发送消息
执行发送代码之前,我们确保已经创建了(一个直连交换机
direct-exchange
,一个队列direct-queue
,且绑定的 key 为direct
正常情况下,我们执行代码肯定是发送成功的,可以看到控制台绿色输出
且我们在消息队列中也成功接收到了消息:
到这步是没有任何问题的,那我们就需要手动给它制造点问题~ 我们可以修改 交换机名称
,这个时候发送消息的时候找不到交换机,那么交换机肯定就会返回 nack
,再看是否可以进入到我们代码中的判断:
代码执行虽然是绿色的,但因为rabbitMQ找不到正确的交换机,而导致消息发送失败,也就是下图的这个过程:
这一个是 publish -> exchange
失败我们顺利的捕获到了,那么 exchange -> queue
这步的失败是我们是否能够正常捕获?我们可以通过修改 路由 key
使交换机路由不到对应的 queue
可以发现当交换机没有路由到相对应的 queue 时,也成功触发了我们自定义的回调函数,然后看 rabbitMQ 控制台是可以发现消息已经成功投递到交换机
到这里,我们通过两种简单的错误模拟,使程序都能顺利的进入到我们预先定义的回调中,如果遇到发送失败的情况,我们可以在失败的回调中自定义消息重发
机制,最大程度上避免消息丢失的问题
4)总结
我们可以通过 publisher-confirm
和 publisher-return
两种错误捕获机制,来避免 生产者 -> exchange -> queue
这条链路的消息丢失
publisher-confirm
- 消息成功发送到 exchange,返回 ack
- 消息未能成功发送到 exchange,返回 nack
- 消息发送过程中出现异常,没有收到回执,则进入 failureCallback 回调
publisher-return
- 消息成功发送到 exchange,但没有路由到 queue,调用自定义回调函数 returnCallback
2、消息存储丢失
消息存储丢失是啥意思?其实就是持久化
的概念,当消息已经成功发送到 queue 时,这个时候如果消费者没有及时进行消费,rabbitMQ 又刚好宕机重启了,那么这个时候就会发现消息丢失了。
这是因为 MQ 默认是内存存储消息,我们可以通过开启持久化的功能来确保在 MQ 中的消息不丢失
其实我们通过 RabbitMQ 提供的 GUI 创建交换机或队列的时候就可以发现有持久化的这个选项
如果将 durability
设为 durable 后,我们可以发现无论如何重启 MQ,重启后交换机和队列依然存在。
但是很多时候我们交换机
和 队列
的创建并非在 GUI 上创建,而是通过应用代码的方式创建
- 交换机持久化
- 队列持久化
- 消息持久化
默认情况下,AMQP 发出的消息都是持久化的,不用特意指定
3、消费者消费丢失
RabbitMQ 采取的机制是当确认消息被消费者消费后就会立即删除
那么如何确认消息已被消费者消费?那就还得依靠回执来确认,消费者获取消息后,需要向 RabbitMQ 发送 ack
回执,表明自己已经处理消息。其中 ack
在 AMQP 中有三种确认模式:
- manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack
- auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,反之返回 nack
- none:关闭 ack,MQ 在消息投递后会立即删除消息
上述三种方式都是通过修改配置文件:
1)manual
该方式需要用户自己手动确认,灵活性较好
这个时候如果执行逻辑是正常的,那么在 RabbitMQ 上就会将该消息删除,但是如果执行的逻辑抛出了异常,没有进入到手动确认的环节,RabbitMQ 将会把该消息保留:
2)auto
该方式在没有异常发生时会自动进行消息确认
我们在配置文件中将确认方式改为 auto
进行测试:
正常情况下接收消息是没有任何问题的,那我们同样制造些非正常情况:
我们手动制造了点异常,发现消息没有被 RabbitMQ 删除的同时,而且控制台一直在报错,无止境的在尝试重新消费,这如果放在线上环境难免有些令人崩溃。
当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,就会导致 MQ 的消息处理飙升
而发生这种情况的原因所在便是因为 RabbitMQ的消息失败重试机制
,但很多时候我们可能不想一直重试,只需要经过几次尝试,如果失败就放弃处理,这个时候我们就需要在配置文件中配置失败重试机制:
开启该配置后,我们重启项目进行观察
通过控制台可以看到在重试 3 次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException
,说明本地重试机制生效了。而且我们回到 RabbitMQ 控制台可以看到对应消息被删除了,说明最后 SpringAMQP 返回的是 ack,导致消息被 MQ 删除
但是这种处理方式并不优雅
,重试后直接删除消息过于 暴力,那么有没有更好的处理方式?答案是有的!
我们可以利用 AMQP 提供的 MessageRecovery
接口来实现,该接口有三种不同的实现方式:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢失消息。默认方式,以上就是采用这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
三种方式可以根据不同场景进行采用,分析一下,不难发现第三种 RepublishMessageRecoverer
是比较优雅的~ 当重试失败后会将消息投递到一个指定专门存放异常消息的队列,后续由人工集中进行处理!具体使用方式如下:
通过自定义异常处理后,我们重启项目查看控制台:
可以发现重试3次后,我们的异常消息进入到了我们自定义的异常队列中
3)none
该方式没啥好讲的~ 无论消息异常与否 MQ 都会进行删除!
4、总结
假如这个时候面试再问你,如何确保 RabbitMQ消息的可靠性?那你可得好好唠嗑唠嗑
如何保证消息不丢失?
1)首先分析丢失的场景有哪些?
消息丢失可能发生在 发送时丢失(未送达 exchange / 未路由到 queue)
、消息未持久化而MQ宕机
、消费者接收消息未能正确消费
2)然后如何预防
- 开启生产者确认机制,确保生产者的消息能到达队列
确认机制包括 publisher-confirm
和 publisher-return
当未送达到 交换机 我们可以通过 publisher-confirm 返回的 ack
和 nack
来确认
当 交换机 未成功路由到 队列,我们可以通过 publisher-return
自定义的回调函数来确认,每个 RabbitTemplate 只能配置一个 ReturnCallback
- 开启持久化功能,确保消息未消费前在队列中不会丢失
持久化功能分为 交换机持久化
、队列持久化
和 消息持久化
,我们都需要将 durable 设置为 true
- 开启消费者确认机制最低为
auto
级别
消费者确认机制有三种类型:manual (手动确认)
、auto (自动确认)
、none (关闭 ack)
- 失败重试机制
我们手动设置 MessageResoverer
为 RepublishMessageRecoverer 方式,将投递失败的消息转到异常队列中,交由人工处理
这一套组合拳回答下来,面试官还不得默默承认你有点东西?
当然这只是 RabbitMQ 的问题之一,我们下篇继续其他几个问题的解决方式~
不要空谈,不要贪懒,和小菜一起做个吹着牛X做架构
的程序猿吧~点个关注做个伴,让小菜不再孤单。咱们下文见!
今天的你多努力一点,明天的你就能少说一句求人的话!我是小菜,一个和你一起变强的男人。