消息中间件消息丢失问题,由于本人只用过rabbitmq和kafka,就这两种中间件简单说明一下
rabbitmq中间件
生产者消息丢失
这里生产者在发送的过程中,由于网络问题导致消息没有发送到mq,有两种解决办法
- 使用事务
- 使用ack机制
Rabbitmq就是生产在发送消息的时候,开启了事物channel.txSelect,然后发消息,如果发送过程中出现异常,就是调用channel.txRollback进行回滚,如果正常发送,则调用channel.txCommit
代码语言:javascript复制//开启事务
channel.txSelect
try {
这⾥发送消息
} catch (Exception e) {
channel.txRollback
这⾥再次重发这条消息
}
channel.txCommit
众所周知,开启事务是同步操作,会导致性能问题。
另外一种就是ack,开启confirm模式,发送的每一条消息都有一个唯一的表示id,当发送到rabbitmq成功之后,rabbitmq会返回一个ack消息,告诉消息正常发送了,如果rabbitmq没有接收到消息,就会回调接口nack接口,这里也可以进行重新发送消息,或者等待超时没有回调,也可以发送消息,这样就可以保证生产者不丢失消息
rabbitmq消息丢失
这里大多数原因是因为消息接收到了mq,但是服务挂了,没有持久化到磁盘,此时这里我们可以开启持久化机制,持久化分为两种
- queue持久化
- 消息持久化
我们必须开启两种持久化,但是这里如果消息过来了还没有即使持久化到磁盘,服务就挂了,也可能导致消息丢失,因此这里要配合生产者的ack机制,等到消息持久化到磁盘之后,在响应生产者ack消息
消费者丢失消息
这种当发送消息到我们的服务中的时候,此时我们可能还没有消费,就碰到异常或者服务宕机就会导致消息丢失,因为rabbitmq中间件默认是自动ack机制,此时我们可以关闭自动ack的机制,等我消费完之后,再去ack我们的消息,这样就可以保证消息不丢失
kafka
消费者消息丢失
kafka消息丢失和rabbitmq丢失也是一样的,kafka消费者丢失是因为消息会自动提交offset,因此我们可以照样关闭自动提交offset,在我处理完消息的时候,手动提交offset消息,这样就可以保证消息不丢失了
broker消息丢失
比较常见的场景就是kafka的leader消费了消息,但是宕机了,此时还没有同步到其他的broker即follower,这样就是导致消息丢失,我们可以修改一些配置保证我们的消息不丢失
代码语言:javascript复制1.设置topic设置参数replication.factor参数,这个值大于1,
保证每个partion必须有两个副本
2.kafka服务端设置min.insync.replices参数,这个大于1,
保证leader至少有一个follower保持联系
3.producer端设置acks=all,这个就可以保证消息同步到了所有的副本
才算成功
4.producer设置retries=MAX,保证消息可以无限次重试
生产者
正产我如果设置了ack=all,一定不会丢失消息,因为他保证了消息同步了所有的副本,才认为成功,但是必须设置的有多个和leader保持同步的follower,如果只有一个副本,这个副本宕机了,你设置了无限次重试,也是没有用的