全是干货的技术号: 本文已收录在github仓库 Java-Interview-Tutorial,欢迎 star!
0 前言
对系统增加MQ:
- 对峰值写流量做削峰填谷
- 对次要业务逻辑做异步
- 对不同系统模块做解耦
因为业务逻辑从同步代码中移除,所以也要有相应队列处理程序处理消息、执行业务逻辑。随着业务逻辑复杂,会引入更多外部系统和服务,就会越来越多使用MQ,与外部系统解耦合以及提升系统性能。
如系统要加红包功能:用户在购买一定数量商品后,系统给用户发一个现金红包激励用户消费。由于发放红包这种次要业务过程不应在购买商品的主业务流程,所以考虑MQ异步。 但引入 MQ 就必然遇到如下问题:
- 若消息在投递过程丢失 用户就会因没有得到红包而投诉到你这边
- 消息在投递过程出现重复 就会因发送两个红包而导致公司资产损失
1 消息为何会丢失?
消息从被写入到MQ,到被消费者消费完成,该链路上的如下场景可能丢失消息:
- 消息从生产者(后文简称为Pro)写入到MQ的过程
- 消息在MQ中的存储场景
- 消息被消费者(后文简称为Con)消费的过程
1.1 在消息生产的过程
消息的Pro一般是业务服务器,MQ独立部署在单独服务器。二者间网络虽是内网,但也存在抖动可能,一旦网络抖动,消息就可能因网络错误而丢失。
1.1.1 解决方案
推荐消息重传:当你发现发送超时后,重发一次消息,但也不能无限重发。一般若不是MQ故障或到MQ的网络断开了,重试2~3次即可。
但这种方案可能造成【消息重复】,从而在消费时重复消费同样的消息。 比方说消息生产时,由于MQ处理慢或网络抖动,导致虽最终写入MQ成功,但对于Pro却是超时的,于是Pro重传这条消息,导致重复消息,你收到了两个现金红包!
1.2 在MQ中
消息在Kafka是存在本地磁盘,为减少消息存储时对磁盘的随机I/O,一般会将消息先写到os的Page Cache,然后择机机刷盘。
如Kafka可配置异步刷盘时机:
- 当达到某一时间间隔
- 或累积一定消息数量
假如你经营一个图书馆,读者每还一本书你都要去把图书归位,不仅工作量大且效率低下,但若你能选择每隔3h或图书达到一定数量,再把图书归位,这就能把同一类型的书一起归位,节省查找图书位置的时间,提高人效。
不过若发生掉电或异常重启,Page Cache还没有来得及刷盘的消息就会丢失。这咋办?你可能会:
- 把刷盘的间隔设置很短
- 或设置累积一条消息
就刷盘,但频繁刷盘很影响性能,且宕机或掉电几率其实也不高,故不推荐!
若你的系统对消息丢失容忍度很低,可考虑集群部署Kafka,通过部署多个副本备份数据,保证尽量不丢消息。
ISR
Kafka集群中有个Leader,负责消息的写入和消费,可有多个Follower负责数据备份。
Follower中有个特殊集合 — ISR(in-sync replicas),当Leader故障,新选举出来的Leader会从ISR中选择。默认Leader的数据会异步复制给Follower,这样在Leader掉电或宕机时,Kafka会从Follower中消费消息,减少消息丢失的可能。
但因消息默认是异步从Leader复制到Follower,所以一旦Leader宕机,那些还没来得及复制到Follower的消息还是会丢失。为解决该问题,Kafka为生产者提供“acks”:
acks
当该选项被设置为“all”,Pro发的每条消息,除了发给Leader,还会发给所有ISR,且必须得到Leader和所有ISR的确认后,才被认为发送成功。这样,只有Leader和所有ISR都挂了消息才会丢失。
当设置“acks=all”,需同步执行1、3、4三个步骤,对消息生产的性能有很大影响,实际应用中需仔细权衡。
最佳实践
- 若你就是要确保消息一条都不能丢,就你让开启MQ的同步刷盘,而应该用集群方案,可配置当所有ISR Follower都接收到消息,才返回成功
- 若对丢消息有一定容忍度,则建议不部署集群,即使集群部署,也推荐配置只发送给一个Follower即可返回成功
- 业务系统一般对消息丢失有一定容忍度,如红包系统,若红包消息丢了,只要后续给没发送红包的用户补偿发送即可!
1.3 在消费过程
一个Con消费消息的进度是记录在MQ集群中的,消费过程分为如下步骤:
- 接收消息
- 处理消息
- 更新消费进度
接收消息,处理消息的过程都可能异常,如:
- 接收消息时网络抖动,导致消息并未被正确接收
- 处理消息时可能发生一些业务异常,导致处理流程未执行完成,这时若更新消费进度,这条失败的消息就永远不会被处理了,就算丢失了
所以,务必等到消息接收、处理完成后,才能更新消费进度,但这也会造成消息重复,比如某条消息在处理后,Con恰好宕机,就因未更新消费进度,所以当该Con重启后,还会重复消费这条消息。
2 保证消息只被消费一次
经过上面分析发现,为避免消息丢失,我们需要付出代价:
- 性能损耗
- 可能造成消息重复消费
性能损耗还能接受,因为一般业务系统只有在写请求时,才有发送MQ的操作,而一般系统的写请求的量级并不高。但消息一旦被重复消费,就会造成业务逻辑处理错误,如何避免消息重复消费问题呢?
完全避免消息重复发生真的很难,因为网络抖动、机器宕机和处理异常都难以避免,业界也并无成熟方法,只能将要求放宽,只要保证即使消费到了重复消息,从消费的最终结果来看和只消费一次是等同即可,即保证在消息的生产和消费的过程“幂等”。
2.1 幂等
多次执行同一个操作和执行一次操作,最终得到的结果是相同的。
若消费一条消息,要将库存-1,则若消费两条相同消息,库存-2,这就非幂等的。 而若:
- 消费一条消息后,处理逻辑是将库存数置0
- 或若当前库存数是10,则减1
这样消费多条消息时,所得结果相同,这就是幂等。
2.1.1 生产过程增加消息幂等
消息在生产、消费过程中都可能重复,所以要在生产、消费过程增加消息幂等性保证,这就能认为从“最终结果看”,消息实际上是只被消费一次。
消息生产过程中,Kafka0.11和Pulsar都支持“producer idempotency”,即生产过程幂等性,这保证消息虽然可能在生产端产生重复,但最终在MQ存储时只会存一份。
实现原理
给每个Pro一个唯一ID,并为生产的每条消息赋予一个唯一ID,MQ服务端会存储:
生产者ID=》最后一条消息ID的映射。 当某Pro产生新消息,Broker比对消息ID是否与存储的最后一条ID一致:
- 若一致,就认为是重复消息,Broker自动丢弃
2.1.2 消费过程增加消息幂等
消费端幂等性保证稍微复杂,可从通用层和业务*两个层面考虑:
通用层面
消息被生产时,使用发号器给其生成一个全局唯一消息ID。消息被处理后,将该ID存储在DB,在处理下一条消息前,先从DB查询该全局ID是否被消费:
- 若被消费过,就放弃消费
生产端幂等保证 && 消费端通用层面的幂等保证,都是为每个消息生成唯一ID,然后在使用该消息时,先判断ID是否已存在,若存在,则认为消息已被使用。 这其实是一种标准的幂等实现方案:
代码语言:javascript复制// 判断ID是否存在
boolean isIDExisted = selectByID(ID);
if(isIDExisted) {
// 存在则直接返回
return;
} else {
// 不存在,则处理消息
process(message);
// 存储ID
saveID(ID);
}
但若消息在处理后,还没有来得及写DB,Con宕机了,重启后发现DB并无这条消息,还是会重复执行两次消费逻辑,这就要引入事务,保证消息处理和写入DB必须同时成功或失败,但这样消息处理成本更高,所以若对消息重复无苛刻要求,可直接使用这种通用方案,而不考虑引入事务。
业务层面
有很多处理方式,有种是使用乐观锁,如你的消息处理程序要给一个人的账户加钱: 给每个人的账户数据加个版本号:
- Pro生产消息时,先查询该账户版本号,将版本号连同消息一起发给Broker
- Con拿到消息和版本号后,在执行更新账户金额SQL时,带上版本号:
update user
set amount = amount 20,
version=version 1
where userId=1
and version=1;
更新数据时,给数据加乐观锁,这样在消费第一条消息时,version值为1,SQL可执行成功且同时把version值改为2。 在执行第二条相同消息时,由于version值不再是1,所以该SQL不会再成功执行,这就实现了消息幂等。
3 总结
消息丢失可通过如下方案解决:
- 生产端重试
- 消息队列配置集群模式
- 消费端合理处理消费进度
为解决消息丢失问题,通常会造成:
- 性能损失
- 消息重复
通过保证消息处理的幂等性可以解决消息的重复问题。
并非说消息丢失一定不能接受,在允许消息丢失情况下,MQ性能更好,方案实现复杂度也最低。像是日志处理等场景,日志意义在于排查系统问题,而系统问题几率不高,偶发丢几条日志也能接受。
方案设计都看业务要求,你不能把所有MQ都配置成防止消息丢失方式,也不能要求所有业务处理逻辑都要支持幂等性,这会给开发和运维带来额外负担。