消息的可靠性传输,如何处理消息丢失问题?

2022-11-30 15:03:31 浏览数 (1)

用MQ时,要注意消息数据:

  • 不能多,牵涉重复消费处理和幂等性问题
  • 不能少,消息不能搞丢呀

若这是用MQ传递非常核心的消息,如计费系统,就是很重的业务,操作很耗时,设计上经常将计费做成异步化,就是用MQ。

为确保MQ传递过程中不会弄丢计费消息。广告主投放个广告,说好用户点击一次扣1块。结果要是用户动不动点击了一次,扣费时搞的消息丢了,公司就会不断少几块。

MQ丢数据,一般分两种:

  • MQ自己弄丢了
  • 消费时弄丢了

1.1 生产者丢数据

生产者将数据发送到MQ时,因为网络等问题,数据在半路丢了。

解决方案
事务功能
  • 生产者发数据前,开启事务(channel.txSelect),然后发送消息
  • 若消息未成功被MQ接收到,则Pro会收到异常报错,此时即可回滚事务(channel.txRollback),然后重试发送消息
  • 若收到消息,则可提交事务(channel.txCommit)

但MQ事务机制一搞,因为太耗性能,吞吐量就会降低。所以一般若你要确保写RabbitMQ的消息别丢,可开启confirm模式。

confirm模式

在Pro开启confirm模式后,你每次写的消息都会分配一个唯一id,然后若写入RabbitMQ,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。

若RabbitMQ未能处理该消息,就会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。可结合该机制,自己在内存里维护每个消息id的状态,若超过一定时间还没接收到该消息的回调,你就能重发。

事务机制 V.S cnofirm机制
  • 事务机制是同步的,你提交一个事务后,会阻塞在那儿
  • confirm机制是异步的,你发送个消息后,即可发送下个消息,然后那个消息RabbitMQ接收后,会异步回调你一个接口,通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都用confirm机制。

RabbitMQ丢数据

RabbitMQ自己丢了数据,这必须开启RabbitMQ持久化:消息写入后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复后会自动读取之前存储的数据。

罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据会丢失的。

设置持久化
  • 创建queue时,将其设置为持久化,保证RabbitMQ持久化queue的元数据,但不会持久化queue里的数据
  • 发送消息时,将消息的deliveryMode设为2:将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘

必须同时设置这两个持久化,RabbitMQ哪怕是挂了,再次重启后,也会从磁盘上重启恢复queue,恢复该queue里的数据。

而且持久化可跟Pro的confirm机制配合,只有消息被持久化到磁盘后,才会通知Pro ack,所以哪怕是在持久化到磁盘前,rabbitmq挂了,数据丢了,生产者收不到ack,也可以自己重发。

Con弄丢数据

你消费到了数据之后,消费者会自动通知RabbitMQ说ok ,我已经消费完这条消息了。

然而可能刚消费到消息,还没处理,Con进程挂了,重启后,RabbitMQ认为你都消费了,这数据就丢了。

解决方案

用RabbitMQ提供的ack机制,关闭RabbitMQ自动ack,可通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack。

这样若你还没处理完,就不会ack,RabbitMQ就认为你还没处理完,这时RabbitMQ会把这个消费分配给别的consumer处理,不会丢消息。

2 Kafka

消费端丢数据

唯一可能导致Con丢数据case:消费到了该消息,然后Con自动提交了offset,让kafka以为你已消费完该消息,然而其实你刚准备处理这消息,你还没处理完,你就挂了,这条消息就丢了。 因此,得关闭自动提交offset,在处理完后Con手动提交offset,即可保证数据不会丢。

但此时确实会重复消费,如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己设计保证幂等即可。

Broker弄丢数据

kafka某broker宕机,然后重新选举partiton的leader时。若此时其他follower刚好还有些数据没同步,结果此时leader挂了,然后选举某个follower成leader后,就丢了一些数据。

一般要求起码设置如下参数:

  • 给topic设置replication.factor参数:必须大于1,要求每个partition必须有至少2个副本
  • 在kafka Broker设置min.insync.replicas参数:必须大于1,要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower
  • Pro端设置acks=all:要求每条数据,必须写入所有replica后,才能认为是写成功
  • Pro端设置retries=MAX(无限次重试):一旦写入失败,就无限重试,卡在这里

配置后,至少在kafka Broker端能保证在leader所在broker发生故障,进行leader切换时,数据不会丢失。

Pro丢数据

如按上述思路设置ack=all,一定不会丢,要求是,你的leader接收到消息,所有follower都同步到了消息之后,才认为本次写成功了。 如果没满足这条件,生产者会自动不断重试,重试无限次。

3 RocketMQ

RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。

在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息流程大致如下图所示:

在上面的事务消息流程中,基于这三个业务流程:发送 half 消息 -> 处理其他业务 -> commit/rollback。我们来讨论下面的几种情况:

万一生产者发送 half 消息失败,怎么办?

可以做重试或记录消息到如文件、数据库等地方,直接给用户返回失败,本次请求失败。

  • 万一生产者发送 half 消息成功,但是处理其他业务失败,又该怎么办呢? 生产者发送 rollback 请求回滚 RocketMQ 中该条消息,本次请求失败。
  • 万一生产者发送 half 消息成功,但是 RocketMQ 由于某些原因如网络超时等导致没有响应,怎么处理? 由于 half 消息已发送成功,此时 RocketMQ 中已经有该条消息了,RocketMQ 会有一个补偿机制,补偿机制会回调你开发好的一个接口,询问你这条消息是要 commit 还是 rollback。
  • 万一生产者发送 half 消息成功,但是请求 commit 或 rollback 的时候失败了呢? 这个问题与上面的问题一样,都是通过 RocketMQ 的补偿机制来处理。

4 总结

本文分别从生产者、MQ 自身、消费者介绍了导致消息丢失的原因,消息丢失问题是一个比较常见但又必须解决的问题。

不同的 MQ 如何解决消息丢失问题的。消费端导致的消息丢失都是由于数据还未处理成功确提前通知 MQ 消息已经处理成功了,禁止自动提交或异步操作即可,处理起来比较简单;生产者和 MQ 自身导致的消息丢失则比较难处理,RabbitMQ 使用了 Confirm 模式避免消息丢失;Kafka 则配置所有 follower 同步成功才给生产者响应推送消息成功;RocketMQ 则使用事务消息来保证消息的零丢失,针对不同的异常情况还提供了补偿机制进行处理。

0 人点赞