面试官:生产环境中使用RocketMQ常见问题

2023-11-26 17:52:28 浏览数 (2)

使用RocketMQ如何保证消息不丢失?

哪些环节会有丢消息的可能?

其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。

然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。

这个是MQ场景都会面对的通用的丢消息问题。那我们看看用RocketMQ时要如何解决这个问题.

RocketMQ消息零丢失方案

生产者使用事务消息机制保证消息零丢失

1、为什么要发送个half消息?有什么用?

这个half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。那这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备。

2.half消息如果写入失败了怎么办?

如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写入消息到MQ如果失败就会非常尴尬了。而half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。

3.订单系统写数据库失败了怎么办?

这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然,也可以设计另外的补偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。而如果使用事务消息机制,就可以有一种更优雅的方案。

如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失。

4.half消息写入成功后RocketMQ挂了怎么办?

我们需要注意下,在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务。这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。

5.下单成功后如何优雅的等待支付成功?

在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟,内完成订单支付,支付完成后才会通知下游服务进行进一步的营销补偿。

如果不用事务消息,那通常会怎么办?

最简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。

那更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。

那如果使用上了事务消息呢?我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

6、事务消息机制的作用

整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。目前来看,也是业内最好的降级方案。

RocketMQ配置同步刷盘 Dledger主从架构保证MQ主从同步时不会丢消息

同步刷盘和异步刷盘

我们可以简单的把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了。

如上图所示,在同步刷盘中需要等待一个刷盘成功的 ACK ,同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是 性能上会有较大影响 ,一般地适用于金融等特定业务场景。

而异步刷盘往往是开启一个线程去异步地执行刷盘操作。消息刷盘采用后台异步线程提交的方式进行, 降低了读写延迟 ,提高了 MQ 的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景。

一般地,异步刷盘只有在 Broker 意外宕机的时候会丢失部分数据,你可以设置 Broker 的参数 FlushDiskType 来调整你的刷盘策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。

同步复制和异步复制

上面的同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制主要是指的 Borker 主从模式下,主节点返回消息给客户端的时候是否需要同步从节点。

  • 同步复制: 也叫 “同步双写”,也就是说,只有消息同步双写到主从节点上时才返回写入成功
  • 异步复制: 消息写入主节点之后就直接返回写入成功

然而,很多事情是没有完美的方案的,就比如我们进行消息写入的节点越多就更能保证消息的可靠性,但是随之的性能也会下降,所以需要程序员根据特定业务场景去选择适应的主从复制方案。

那么,异步复制会不会也像异步刷盘那样影响消息的可靠性呢?

答案是不会的,因为两者就是不同的概念,对于消息可靠性是通过不同的刷盘策略保证的,而像异步同步复制策略仅仅是影响到了 可用性 。为什么呢?其主要原因RocketMQ 是不支持自动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产消息了

比如这个时候采用异步复制的方式,在主节点还未发送完需要同步的消息的时候主节点挂掉了,这个时候从节点就少了一部分消息。但是此时生产者无法再给主节点生产消息了,消费者可以自动切换到从节点进行消费(仅仅是消费),所以在主节点挂掉的时间只会产生主从结点短暂的消息不一致的情况,降低了可用性,而当主节点重启之后,从节点那部分未来得及复制的消息还会继续复制。

在单主从架构中,如果一个主节点挂掉了,那么也就意味着整个系统不能再生产了。那么这个可用性的问题能否解决呢?一个主从不行那就多个主从的呗,别忘了在我们最初的架构图中,每个 Topic 是分布在不同 Broker 中的。

但是这种复制方式同样也会带来一个问题,那就是无法保证 严格顺序 。在上文中我们提到了如何保证的消息顺序性是通过将一个语义的消息发送在同一个队列中,使用 Topic 下的队列来保证顺序性的。如果此时我们主节点A负责的是订单A的一系列语义消息,然后它挂了,这样其他节点是无法代替主节点A的,如果我们任意节点都可以存入任何消息,那就没有顺序性可言了。

而在 RocketMQ 中采用了 Dledger 解决这个问题。他要求在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。这里我就不展开说明了,读者可以自己去了解。

也不是说 Dledger 是个完美的方案,至少在 Dledger 选举过程中是无法提供服务的,而且他必须要使用三个节点或以上,如果多数节点同时挂掉他也是无法保证可用性的,而且要求消息复制半数以上节点的效率和直接异步复制还是有一定的差距的。

Dledger的文件同步

在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。

简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。

Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。

接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。

再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。

消费者端不要使用异步消费机制

正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失:

代码语言:java复制
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name_4");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                new Thread(){
                    public void run(){
                        //处理业务逻辑
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    }
                };
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。

RocketMQ特有的问题,NameServer挂了如何保证消息不丢失?

NameServer在RocketMQ中,是一个路由中心的角色,提供到Broker的路由功能。但是其实路由中心这样的功能,在所有的MQ中都是需要的。kafka是用zookeeper和一个作为Controller的Broker一起来提供路由服务,整个功能是相当复杂纠结的。而RabbitMQ是由每一个Broker来提供路由服务。而只有RocketMQ把这个路由中心单独抽取了出来,并独立部署。

这个NameServer之前都了解过,集群中任意多的节点挂掉,都不会影响他提供的路由功能。那如果集群中所有的NameServer节点都挂了呢?

有很多人就会认为在生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作一段时间。其实这个问题大家可以做一下实验,当NameServer全部挂了后,生产者和消费者是立即就无法工作了的。至于为什么,可以去源码中找找答案。

那再回到我们的消息不丢失的问题,在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的。

消息零丢失方案总结

  • 生产者使用事务消息机制。
  • Broker配置同步刷盘 Dledger主从架构
  • 消费者不要使用异步消费。
  • 整个MQ挂了之后准备降级方案

这整套的消息零丢失方案,在各个环节都大量的降低了系统的处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失的代价可能远远大于部分消息丢失的代价。在设计RocketMQ使用方案时,要根据实际的业务情况来考虑。

使用RocketMQ如何保证消息顺序

MQ的顺序问题分为全局有序和局部有序。

  • 全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
  • 局部有序:只保证一部分关键消息的消费顺序。

对于大部分场景,我们只需要保证局部有序就可以,例如电商订单场景,也只要保证一个订单的所有消息是有序的就可以了。

Producer 生产消息的时候会进行轮询(取决你的负载均衡策略)来向同一主题的不同消息队列MessageQueue发送消息。那么如果此时我有几个消息分别是同一个订单的创建、支付、发货,在轮询的策略下这 三个消息会被发送到不同队列 ,因为在不同的队列此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序的。

那么,怎么解决呢?

只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。

怎么解决重复消费问题

RocketMQ 生产也好,消费也好,有重试机制、重发队列等等,所以在网络情况不太好的情况下, RocketMQ 避免不了消息的重复。

首先分析下为什么会重复消费?

「当producer发送topic消息时,应该往topic下的哪个queue来发送呢?」

producer会采用轮询的策略发送

「那么consumer应该消费哪个queue下的消息呢?」

当有一个消费者时当然是消费所有的queue

imgimg

「如果有多个消费者呢?」

只需要根据各种负载均衡策略将队列分配给消费者即可,如下图是两种负载均衡的方式

imgimg
imgimg

「如果消费者数量超过队列的数量会发生什么?」

多出来的消费者将不会消费任何队列

imgimg

「为什么一个consumer只能消费一个queue呢?」

多个消费者消费一个queue肯定会有并发问题,所以得加锁,这样还不如把topic下的队列数量设置的多一点

「我在运行的过程中可以设置topic下queue的数量吗?」

当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景

「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」

是的,大家一般把这个过程叫做重平衡

下面我们来分享一下详细的细节

消息发送流程

消息发送主要有3种方式单向发送(只发送,不管结果),同步发送和异步发送

imgimg
消息消费流程
消息是基于推还是拉?

消息消费的模式有两种方式:

  1. 拉取:Consumer不断从Broker拉取
  2. 推送:Broker向Consumer推送

这两种方式都有各自的缺点:

  1. 拉取:拉取的间隔不好确定,间隔太短没消息时会造成带宽浪费,间隔太长又会造成消息不能及时被消费
  2. 推送:「推送和速率难以适配消费速率」,推的太快,消费者消费不过来怎么办?推的太慢消息不能及时被消费

「看起来拉取和推送难以抉择」

然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!

「你猜怎么改的?」

其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。

  1. 如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。
  2. 如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取

「对了,这种策略就叫做长轮询」

「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的」

具体消费流程
imgimg

「拉取到消息后是怎么处理的呢?」

PullRequest类的成员变量如下图

imgimg

当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体

「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏把」

msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的

「什么是流控呢?」

就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图

imgimg

当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等

imgimg

接着就是收到响应,处理消息,并键PullRequest再次放入阻塞队列.

「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」

嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的」

Consumer怎么提交offset?
imgimg

当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中

「这样就会造成消息的重复消费」

  1. Consumer消费完消息并不是实时同步到Broker的,而是将offset先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致offset没提交,下次没提交offset的这部分消息会被再次消费
  2. 即使offset被提交到了Broker,在还没来得及持久化的时候Broker宕机了,当重启的时候Broker会读取consumerOffset.json中保存的offset信息,这就会导致没持久化offset的这部分消息会被再次消费

通过 幂等解决重复消费问题。

那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用 写入 Redis 来保证,因为 Rediskeyvalue 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。

不过最主要的还是需要 根据特定场景使用特定的解决方案 ,你要知道你的消息消费是否是完全不可重复消费还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。毕竟在 CS 领域还是很少有技术银弹的说法。

而在整个互联网领域,幂等不仅仅适用于消息队列的重复消费问题,这些实现幂等的方法,也同样适用于,在其他场景中来解决重复请求或者重复调用的问题 。比如将HTTP服务设计成幂等的,解决前端或者APP重复提交表单数据的问题 ,也可以将一个微服务设计成幂等的,解决 RPC 框架自动重试导致的 重复调用问题

使用RocketMQ如何快速处理积压消息?

如何确定RocketMQ有大量的消息积压?

在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。

产生消息堆积的根源其实就只有两个——生产者生产太快或者消费者消费太慢。

对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是使用web控制台,就能直接看到消息的积压情况。

在Web控制台的主题页面,可以通过 Consumer管理 按钮实时看到消息的积压情况。

如何处理大量积压的消息?

以增加多个消费者实例去水平扩展增加消费能力来匹配生产的激增。如果消费者消费过慢的话,我们可以先检查 是否是消费者出现了大量的消费错误 ,或者打印一下日志查看是否是哪一个线程卡死,出现了锁资源不释放等等的问题。

当然,最快速解决消息堆积问题的方法还是增加消费者实例,不过 同时你还需要增加每个主题的队列数量 。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了。

RocketMQ 中,一个队列只会被一个消费者消费

如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢? 这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。

总结

本篇文章总结了,在使用RocketMQ时候遇到的问题,以及相应的处理方法,最重要还是要根据实际情况来选择。

0 人点赞