RabbitMQ知识点整理总结

2022-06-23 15:24:06 浏览数 (1)

什么是RabbitMQ?为什么使用RabbitMQ?

答:采用AMQP高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦

可以用它来:解耦、异步、削峰。

为什么要使用rabbitmq?

在分布式系统下具备异步,削峰,负载均衡等一系列高级功能

拥有持久化的机制,进程消息,队列中的信息也可以保存下来

实现消费者和生产者之间的解耦

对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作

可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单

使用rabbitmq的场景

服务间异步通信

顺序消费

定时任务

请求削峰

1、为什么要引入MQ系统,直接读写数据库不行吗? 其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么?

面试官问你这个问题,期望的一个回答是说,你们公司有个什么业务场景,这个业务场景有个什么技术挑战,如果不用 MQ 可能会很麻烦,但是你现在用了 MQ 之后带给了你很多的好处。

先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异步、削峰。 解耦:多系统多进程的数据交换,用pub/sub 异步:把大数据量的同步处理改为异步 削峰:一般的A 系统使用 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。如果使用 MQ, 每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最 大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉,这又设计请求排队的问题。

2、消息队列有什么优缺点? 优点:解耦、异步、削峰 缺点: 系统可用性降低 系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套 系统崩溃的,你不就完了?

系统复杂度提高 硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。

一致性问题 A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

3.消息在什么时候会变成死信?

  • 消息拒绝并且没有设置重新入队
  • 消息过期
  • 消息堆积,并且队列达到最大长度,先入队的消息会变成DL

4、RabbitMQ 的高可用性如何保证? RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式 单机模式不存在高可用。 普通集群模式也不存在高可用性,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。但是你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上 拉取数据过来。这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实 例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。 镜像集群模式的策略是高可用策略,指定的时候可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的 节点上去了。

5、如何解决消息队列的延时以及过期失效问题? 其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了;或者消费的速度极其慢,造成消息堆积了,MQ存储快要爆了,甚至开始过期失效删除数据了。

针对这个问题可以有事前、事中、事后三种处理

  • 事前:开发预警程序,监控最大的可堆积消息数,超过就发预警消息(比如短信),不要等出生产事故了再处理。
  • 事中:看看消费端是不是故障停止了,紧急重启。
  • 事后:中华石杉老师就是说的这一种(https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/mq-time-delay-and-expired-failure.md),需要对消费端紧急扩容 ,增加处理消费者进程,如扩充10倍处理,但其实这也有个问题,即数据库的吞吐是有限制的,如果是消费到数据库也是没办法巨量扩容的,所以还是要在吞吐能力支持下老老实实的泄洪消 费。所以事前预防还是最重要的。否则出发删除过期数据,那就需要再重写生产消息的程序,重新产生消息。

6、RabbitMQ如何保证不丢数据? 需要考虑3个可能丢数据的地方:生产端、队列本身、消费端

  • 6.1生产端:开启事务(不推荐,太耗性能降低吞吐),推荐开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而 且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
  • 6.2队列本身:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。

设置持久化有两个步骤:

    •  创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
    •  第二个是发送消息的时候将消息的 deliveryMode 设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
  • 6.3消费端:其实和kafka的原理很类似,kafka即手动提交offsize。用RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,通过自己的一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别 的 consumer 去处理,消息是不会丢的。

7、如何保证队列的消息不被重复消费? 这个需要灵活作答,考察的是思考力,因为消费的场景有很多,有数据库、有缓存、有第三方接口

  • 1.比如针对数据库,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键(或者UUID),那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  • 2.再比如redis缓存,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
  • 3.再比如第三方接口,需要确定两点,第三方接口程序是有去重能力的,那么脏一点直接丢数据过去,如果没有去重能力,还是需要我们来写程序去重,就是第2点的办法。

8、集群节点类型都有什么? 节点的存储类型分为两种:

  • 磁盘节点
  • 内存节点

磁盘节点就是配置信息和元信息存储在磁盘上,内存节点把这些信息存储在内存中,当然内次节点的性能是大大超越磁盘节点的。 单节点系统必须是磁盘节点,否则每次你重启RabbitMQ之后所有的系统配置信息都会丢失。 RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点。

9.AMQP是什么?

AMQP的基本概念

消息(Message):由有效载荷(playload)和标签(label)组成。其中有效载荷即传输的数据。 生产者(producer):创建消息,发布到代理服务器(Message Broker) 代理服务器(Message Broker):接收和分发消息的应用,RabbitMQ Server就是消息代理服务器,其中包含概念很多,以RabbitMQ 为例:信道(channel)、队列(queue)、交换器(exchange)、路由键(routing key)、绑定(binding key)、虚拟主机(vhost)等 消费者(consumer):连接到代理服务器,并订阅到队列(queue)上,代理服务器将发送消息给一个订阅的/监听的消费者,消费者其只能接收消息的一部分:有效载荷(playload)

RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

10.AMQP协议3层?

Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。

Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。

TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

11.AMQP模型的几大组件?
  • 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
  • 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
  • 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
12.生产者消息运转?

1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。

2.Producer声明一个交换器并设置好相关属性。

3.Producer声明一个队列并设置好相关属性。

4.Producer通过路由键将交换器和队列绑定起来。

5.Producer发送消息到Broker,其中包含路由键、交换器等信息。

6.相应的交换器根据接收到的路由键查找匹配的队列。

7.如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。

8.关闭信道。

9.管理连接。

13.消费者接收消息过程?

1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。

2.向Broker请求消费响应的队列中消息,可能会设置响应的回调函数。

3.等待Broker回应并投递相应队列中的消息,接收消息。

4.消费者确认收到的消息,ack。

5.RabbitMq从队列中删除已经确定的消息。

6.关闭信道。

7.关闭连接。

14. 如何确保消息正确地发送至RabbitMQ?

RabbitMQ使用发送方确认模式,确保消息正确地发送到RabbitMQ。

发送方确认模式:将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

15. 如何确保消息接收方消费了消息?

接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。

下面罗列几种特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要根据bizId去重)
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

16. 如何避免消息重复投递或重复消费?

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。

这个问题针对业务场景来答分以下几点:

1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

17. RabbitMQ的集群模式和集群节点类型

普通模式:默认模式,以两个节点(rabbit01,rabbit02)为例来进行说明,对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01,rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer,所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么等到rabbit01节点恢复,然后才可被消费。如果没有消息持久化,就会产生消息丢失的现象。

镜像模式:把需要的队列做成镜像队列,存在与多个节点属于RabibitMQ的HA方案,该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取,该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉,所以在对可靠性要求比较高的场合中适用

节点分为内存节点(保存状态到内存,但持久化的队列和消息还是会保存到磁盘),磁盘节点(保存状态到内存和磁盘),一个集群中至少需要一个磁盘节点.

rabbitmq常用的5种消息模型

1.基本消息模型

生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

2.work消息模型

工作队列或者竞争消费者模式 让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消 费,就会消失,因此任务是不会被重复执行的

订阅模型(三类)

1、1个生产者,多个消费者 2、每一个消费者都有自己的一个队列 3、生产者没有将消息直接发送到队列,而是发送到了交换机 4、每个队列都要绑定到交换机 5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的 Exchange类型有以下几种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Headers:当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配,headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。(不常用) 根据交换机的类型将订阅模型又细分为三类。
订阅模型-Fanout

主要特点是广播模式, 队列的消费者都能拿到消息。实现一条消息被多个消费者消费.

订阅模型-Direct

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 C1:消费者,其所在队列指定了需要routing key 为 error 的消息 C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

订阅模型-Topic

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符。 通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好 1 个词

0 人点赞