消息模型三大部分
因此,如何保证消息不丢失,也是从这三个环节来考虑。
关键字摘要
- 生产、存储(消息堆积)、消费 三个环节保证消息不丢失
- 生产环节:消息类型,消息确认机制、失败重试机制
- 存储环节:同步/异步刷盘、同步/异步复制slave
- 消费环节:消息确认机制(至少消费成功一次)、失败重试机制、死信队列机制
Q1: 如何保证「消息生产」不丢失?
先想想什么情况下,消息生产会丢失消息呢?
生产者将发送消息时,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失。
那怎么解决这个问题?
其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」。
消息发送成功返回确认消息,那就能确保消息不丢失。如果发送失败了,mq-client就尝试自动重试,避免网络抖动导致发送丢失。
如果超过一定超时时间还是失败,那就抛出异常,由开发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。
不过我们需要特别注意是,RocketMQ支持多种「消息类型」,但是并不是对所有「消息类型」 都会有「消息确认机制」和「失败重试机制」。
RocketMQ生产消息时,支持多种「消息类型」和「消息发送模式」。咱们白话为主,就不展开源码了,有兴趣同学可以参考org.apache.rocketmq.client.producer.MQProducer
这个接口定义即可。
消息类型:
- 普通消息:发送普通消息,异常时默认重试。
- 普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。异常默认不重试,可以用户自己重试,并发送到其他队列。
- 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试。
消息发送模式:
- 同步:调用发送消息方法后,同步阻塞,直到返回SendResult。配置
retryTimesWhenSendFailed
重试次数。 - 异步:调用发送消息方法后,立即返回,发送结果会通过开发者自己注册的回调函数SendCallback进行处理。配置
retryTimesWhenSendAsyncFailed
重试次数。 - 单向发送:这种方法完全不关心发送后的返回结果。显然,它具有最大吞吐量,但也存在消息丢失的潜在风险。
消息类型 和 消息发送模式 是 N * M
的关系,所以聪明的你一定已经想到了,存在9种不同组合,RocketMQ也是定义了9种不同接口方法。
这9种方法里面,涉及到「单向发送」模式的3种方法,都是不可靠的,存在丢失消息的风险。其他发送消息的模式和消息类型,可以通过 消息确认、mq-client自动「失败重试机制」、业务自定义重试 等方式,确保消息发送不丢失。
注意,
org.apache.rocketmq.client.producer.MQProducer
还定义了「事务消息」的发送模式,是属于分布式事务范畴了,跟我们这里讨论的消息不丢失不太一样,就不展开讨论了。后面单独写一篇针对「事务消息」的分析。
Q2: 如何保证「消息存储」不丢失?
先想想什么情况下,消息存储会丢失呢?
场景1,消息保存到内存中,还没来得及刷盘到磁盘,机器宕机或者重启,导致内存中消息丢失。 场景2,为了提高可用性,Broker通常采用一主多从的部署方式,为了确保消息不丢失,消息需要被复制到从节点。当消息发送到master但是还没同步到slave broker时,master broker磁盘损坏,导致消息数据丢失。或者master宕机,consumer切换到slave消费数据,消息丢失。
针对场景1,默认情况下,消息在到达 Broker 端后会首先被保存在内存中,并立即向生产者返回确认响应。随后,Broker 会定期批量将一组消息异步刷入磁盘。这种方式减少了 I/O 操作次数,提高了性能。
然而,如果发生机器掉电、异常宕机等情况,未及时将消息刷入磁盘,就可能导致消息丢失的情况。
如果要确保 Broker 端不丢失消息并保证消息的可靠性,我们需要修改消息保存机制为同步刷盘方式,即只有当消息成功存储到磁盘后才返回响应。可以通过flushDiskType = SYNC_FLUSH
参数进行控制。
针对场景2,在默认方式下,当消息成功写入主节点时,就会返回确认响应给生产者,并异步将消息复制到从节点。然而,如果主节点突然宕机且无法恢复,尚未复制到从节点的消息将会丢失。
为了进一步提高消息的可靠性,我们可以采用同步复制方式。主节点将会同步等待从节点完成复制,然后才返回确认响应。这样可以确保消息的可靠性。可以通过brokerRole=SYNC_MASTER
参数进行控制。
注意,同步刷盘 和 同步复制 虽然能够保证消息不丢失,但是会严重降低性能,生产实践中需要根据实际情况综合评估。
Q3: 如何保证「消息消费」不丢失?
先想想什么情况下,消息存储会丢失呢?
因为各种原因消费失败,但是还是提交了消费位点,这条消息从业务角度来说就“丢失”了。
那怎么解决这个问题?
跟消息生产一样,其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」。
消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。
如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。
重试消息会被存入名为 "%RETRY% 消费组名称" 的Topic
中,原始主题Topic
会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。
另外,RocketMQ跟kafka不同的是,天然支持了 「死信队列机制」。
如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。
关键字总结
- 生产、存储(消息堆积)、消费 三个环节保证不丢失
- 生产环节:消息类型,消息确认机制、失败重试机制
- 存储环节:同步/异步刷盘、同步/异步复制slave
- 消费环节:消息确认机制(至少消费成功一次)、失败重试机制、死信队列机制
3分钟到了吗?应该对RocketMQ如何生产消息有全面了解了吧。 如果还想了解更多,欢迎关注下一期内容。
往期热门笔记合集推荐:
- HBase原理与实战笔记合集
- MySQL实战笔记合集
- Canal/Otter源码与实战笔记合集
- Java实战技巧笔记合集
原创:阿丸笔记,欢迎 分享,转载请保留出处。