对系统增加MQ对峰值写流量做削峰填谷,对次要业务逻辑做异步,对不同系统模块做解耦。 因为业务逻辑从同步代码中移除了,所以也要有相应队列处理程序处理消息、执行业务逻辑。
随着业务逻辑复杂,会引入更多外部系统和服务,就会越来越多使用MQ 与外部系统解耦合以及提升系统性能。
比如系统要加红包功能:用户在购买一定数量商品后,系统给用户发一个现金红包鼓励用户消费。由于发放红包的过程不应在购买商品的主流程,所以考虑MQ异步。 但发现一个问题:
- 若消息在投递过程丢失 用户就会因没有得到红包而投诉
- 消息在投递过程出现重复 就会因为发送两个红包而损失
消息为什么会丢失
消息从被写入到MQ到被消费者消费完成,这个链路上会有哪些地方存在丢失消息的可能呢?其实主要存在三个场景:
- 消息从生产者写入到消息队列的过程
- 消息在消息队列中的存储场景
- 消息被消费者消费的过程。
在消息生产的过程中丢失消息
两种情况。
- 首先,消息的生产者一般是业务服务器,MQ独立部署在单独服务器。二者间的网络虽是内网,但也存在抖动可能,一旦发生抖动,消息就可能因网络错误而丢失。 推荐消息重传,即当你发现发送超时后,就将消息重发一次,但也不能无限重发。一般若不是MQ故障或到MQ的网络断开了,重试2~3次即可。
但这种方案可能造成消息重复,从而在消费时重复消费同样的消息。 比方说消息生产时,由于MQ处理慢或网络抖动,导致虽最终写入MQ成功,但在生产端却超时,生产者重传这条消息就会形成重复消息,你就收到了两个现金红包!
在MQ中丢失消息
消息在Kafka是存在本地磁盘的,而为了减少消息存储时对磁盘的随机I/O,一般会将消息先写到os的Page Cache,然后再找合适时机刷盘。
比如Kafka可以配置异步刷盘时机:
- 当达到某一时间间隔
- 或累积一定消息数量
假如你经营一个图书馆,读者每还一本书你都要去把图书归位,不仅工作量大而且效率低下,但是如果你可以选择每隔3小时或者图书达到一定数量的时候再把图书归位,这样可以把同一类型的书一起归位,节省了查找图书位置的时间,可以提高效率。
不过如果发生掉电或异常重启,Page Cache中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?
你可能会:
- 把刷盘的间隔设置很短
- 或设置累积一条消息
就刷盘,但频繁刷盘会对很影响性能,而且宕机或掉电几率也不高,不推荐。
如果你的系统对消息丢失容忍度很低,可考虑集群部署Kafka,通过部署多个副本备份数据,保证消息尽量不丢失。
Kafka集群中有一个Leader负责消息的写入和消费,可以有多个Follower负责数据的备份。Follower中有一个特殊的集合叫做ISR(in-sync replicas),当Leader故障时,新选举出来的Leader会从ISR中选择,默认Leader的数据会异步地复制给Follower,这样在Leader发生掉电或者宕机时,Kafka会从Follower中消费消息,减少消息丢失的可能。
由于默认消息是异步地从Leader复制到Follower的,所以一旦Leader宕机,那些还没有来得及复制到Follower的消息还是会丢失。 为解决这个问题,Kafka为生产者提供“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给Leader外还会发给所有的ISR,并且必须得到Leader和所有ISR的确认后才被认为发送成功。这样,只有Leader和所有的ISR都挂了消息才会丢失。
当设置“acks=all”时,需要同步执行1、3、4三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。我给你的建议是:
1.如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是用集群的方式来解决,可以配置当所有ISR Follower都接收到消息才返回成功。
2.如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个Follower就可以返回成功了。
3.我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。
在消费的过程中存在消息丢失的可能
一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。
这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,这条失败的消息就永远不会被处理了,也可以认为是丢失了。
所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后还会重复地消费这条消息。
如何保证消息只被消费一次 从上面的分析中你能发现,为了避免消息丢失我们需要付出两方面的代价:一方面是性能的损耗,一方面可能造成消息重复消费。
性能的损耗我们还可以接受,因为一般业务系统只有在写请求时才会有发送消息队列的操作,而一般系统的写请求的量级并不高,但是消息一旦被重复消费就会造成业务逻辑处理的错误。那么我们要如何避免消息的重复呢?
想要完全的避免消息重复的发生是很难做到的,因为网络的抖动、机器的宕机和处理的异常都是比较难以避免的,在工业上并没有成熟的方法,因此我们会把要求放宽,只要保证即使消费到了重复的消息,从消费的最终结果来看和只消费一次是等同的就好了,也就是保证在消息的生产和消费的过程是“幂等”的。
幂等
多次执行同一个操作和执行一次操作,最终得到的结果是相同的。
如果消费一条消息,要将库存数减1,那么如消费两条相同消息,库存数减2,这就非幂等。 而如果消费一条消息后处理逻辑是将库存的数置0, 或如果当前库存数是10,则减1,这样在消费多条消息时所得到的结果就是相同的,这就是幂等。
一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。
生产、消费过程增加消息幂等
消息在生产和消费的过程中都可能重复,所以要在生产、消费过程增加消息幂等性保证,这样就可认为从“最终结果上来看”消息实际上是只被消费一次。
消息生产过程中,在Kafka0.11和Pulsar都支持“producer idempotency”,即生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但最终在MQ 存储时只会存一份。
这是怎么做到的呢?
给每个生产者一个唯一ID,并为生产的每条消息赋予一个唯一ID,MQ服务端会存储<生产者ID,最后一条消息ID>
映射。
当某生产者产生新消息,MQ服务端比对消息ID是否与存储的最后一条ID一致,若一致,就认为是重复消息,服务端自动丢弃。
在消费端,幂等可从如下两方面考虑:
- 通用层 可在消息被生产时,使用发号器给它生成一个全局唯一消息ID,消息被处理后,把这个ID存储在DB,在处理下一条消息前,先从DB查询该全局ID是否被消费过,若被消费过就放弃消费。
无论是生产端的幂等保证还是消费端通用的幂等性保证,它们的共同特点都是为每个消息生成唯一ID,然后在使用这个消息时,先比对ID是否已存在,存在则认为消息已被使用。 所以这种方式是一种标准的实现幂等的方式,实战中可直接使用,伪代码如 下:
代码语言:javascript复制// 判断ID是否存在
boolean isIDExisted = selectByID(ID);
if(isIDExisted) {
// 存在则直接返回
return;
} else {
// 不存在,则处理消息
process(message);
// 存储ID
saveID(ID);
}
不过这样会有个问题:如果消息在处理之后,还没有来得及写入DB,消费者宕机了,重启后发现DB并无这条消息,还是会重复执行两次消费逻辑,这时就需要引入事务,保证消息处理和写入DB必须同时成功或失败,但这样消息处理成本更高,所以如果对消息重复没有特别严格要求,可直接使用这种通用方案,而不考虑引入事务。
- 业务层 有很多种处理方式,有一种是增加乐观锁。比如你的消息处理程序需要给一个人的账号加钱。 具体操作: 给每个人的账号数据加个版本号,在生产消息时先查询该账户的版本号,并将版本号连同消息一起发给MQ。消费端拿到消息和版本号后,在执行更新账户金额SQL的时候带上版本号:
update user
set amount = amount 20,
version=version 1
where userId=1
and version=1;
更新数据时,给数据加乐观锁,这样在消费第一条消息时,version值为1,SQL可以执行成功,并且同时把version值改为2。 在执行第二条相同消息时,由于version值不再是1,所以这条SQL不能执行成功,实现了消息幂等。
总结
消息的丢失可以通过生产端的重试、消息队列配置集群模式以及消费端合理处理消费进度三个方式来解决;
为了解决消息的丢失通常会造成性能上的问题以及消息的重复问题;
通过保证消息处理的幂等性可以解决消息的重复问题。
并不是说消息丢失一定不能被接受,毕竟你可以看到在允许消息丢失的情况下,消息队列的性能更好,方案实现的复杂度也最低。比如像是日志处理的场景,日志存在的意义在于排查系统的问题,而系统出现问题的几率不高,偶发的丢失几条日志是可以接受的。
方案设计看场景,你不能把所有的消息队列都配置成防止消息丢失的方式,也不能要求所有的业务处理逻辑都要支持幂等性,这样会给开发和运维带来额外负担。