导语
本文从AMQP协议(Advanced Message Queuing Protocol,高级消息队列协议)、消息功能、消费模型、金融级用法及其他功能点对比等概念介绍对RabbitMQ做了科普, 希望对各位深入理解RabbitMQ有帮助。
AMQP协议概念
AMQP协议自身定义了很多概念,下面先对这些概念进行剖析,会更侧重从每个概念实体的作用域、职责范围、从属关系等维度进行介绍。
AMQP协议概念实体图
Connection
- 对应底层一个AMQP-Client到RabbitMQ-Broker的一个TCP连接。
- 这边要考虑两个端点问题,在TCP连接建立完成后,如下图所示,连接的目标Broker就已经确定是集群中的一台了,由于是长连接,除非断连重建,否则对端节点不可变。
- 所以从这里可以看出RabbitMQ相比Pulsar、RocketMQ不一样的地方在于,其是一种服务端寻址模型,以Client的视角来看,想要连接任意Exchange、Queue,只要连上任意一台Broker就行。
Channel
- 信道,可以理解为一种逻辑连接,体现了多路复用的设计思路。
- 支持串行执行,包括收和发的指令,可以理解为一种半双工模式的“虚拟网络通道”。
- 所有Exchange、Queue、Binding的操作都是在Channel之上进行的。
Vhost
- 等价于一种租户隔离概念,不同Vhost下可以创建同名Exchange、Queue,这样可以进行业务隔离。
- RabbitMQ的权限隔离和权限控制的机制是在Vhost级别的。
- Rabbit官方原生的全局Policy控制在Vhost级别。
Exchange
- 一个虚拟实体,声明不同消息的路由策略,自身不存储消息。
- 一个路由器,基于消息头部的RoutingKey和Header将消息路由到符合条件的具体的Queue。
- 支持单播和广播。
Queue
- 消息存储实体,是消息底层存储的容器,类似Pulsar的Topic。
- 单订阅模式,其下的Consumer分别消费到一部分消息。
- 和存储关联,因此有容量上限、ttl等存储层的特性。
- 支持多消费和独占消费,取决于你订阅时设置的参数。由于它是存储消息系统的消息,所以内部基于一个消费位点控制持久化消费进度,记录最后被消费并Ack的位置。
- 面向Consumer。
Binding
- 衔接Exchange和Queue的桥梁,本质是一个规则的声明。
- 一个Exchange下可以有多个Binding。
- 一个Queue也可以被多个Binding关联。
- 一个Exchange到一个Queue也可以声明多个Binding。
消息功能
下面介绍RabbitMQ官方所提供的的开源原生功能,我们知道,AMQP协议可以看做成一种可编程式的消息队列协议,可以基于其提供的基础模型,通过自己的巧妙搭配组合,构造出多种多样的业务模型。
消息结构
每个消息分为三个部分,在网络层面即三个独立数据帧:
- PublishInfo: 消息路由声明信息,可以联想成写电子邮件时填写的目标邮箱、是否接收回执等前置声明。
- HeaderBody: 消息头部,用于存储RabbitMQ自身事先定义的声明,可以联想层HTTP协议的Header一样,此处可以放置一些对业务透明的上下文信息用于提供某种功能,比如分布式链路追踪的TraceId。
- ContentBody: 消息体,无差别二进制数据块,服务端不感知其是否压缩、是否加密等,只进行透明的存储和读取投递。
work queue 工作队列
- 它是一种模型简化,发送消息时指定Exchange为空,RoutingKey为QueueName,Broker以后会直接把这个消息发送至目标Queue,这样对用户来说相当于没Exchange,他认为是直接用Queue来消费,就比较简单。
- 工作队列只适用于单订阅的场景,因为Queue只适用于单订阅。
官方讲解:https://www.rabbitmq.com/tutorials/tutorial-two-python.html
Publish/Subscribe 发布订阅模式
Queue不支持多订阅,通过转换思路实现:
- 一个Fanout类型的Exchange:相当于多订阅场景的Topic。
- 多个不同的Queue:绑定到该Exchange,相当于多订阅场景下的Subscription。
- 多个Consumer消费同一个Queue:常规场景多订阅。
- 每个Consumer各自消费一个Queue:实例级别广播。
官方讲解:
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
Routing 路由模式
- 路由模式是用Rabbit最常用的一种模式。
- Producer发布了一个Exchange,这个Exchange的类型是Direct,在Message中指定RoutingKey,并设置一个非空的值,接下来声明一些Queue,这些 Queue在声明和绑定Exchange的时候,需要指定Binding,消息在路由的时候判断消息里的RoutingKey和BindingKey是不是equal,如果是对等的就可以路由过来。
- 类似tag过滤的消息分发场景。
官方讲解:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
Topic 通配符模式
- 路由模式的升级版,支持通配符匹配。
- Exchange类型为Topic。
- 匹配规则不是正则表达式,是AMQP自己的语法。
官方讲解:https://www.rabbitmq.com/tutorials/tutorial-five-python.html
Header模式
- 不常用,匹配规则不基于RoutingKey,而是基于 HeaderBody.Properties.Headers中的键值对。
- 支持完全匹配所有键值对。
- 支持只匹配一个键值对。
RPC模式
- RPC模式并不常用,基于回复队列。
- 生产者和消费者采用一问一答的模式。
- 等价于RPC的Request-Response模型。
官方讲解:https://www.rabbitmq.com/tutorials/tutorial-six-python.html
消费模型
消费模型也是使用一个消息系统所需要特别关心的一环,在业务的使用过程中,更多地会关注一条消息从生产到投递至消费者整个过程中都经历了什么,整个消息的声明周期是如何闭环的?
下面主要从TDMQ RabbitMQ版的实现来剖析RabbitMQ协议的消息生命周期。
从消息的生命周期看待消费模型
- 已投递未Ack:Consumer独占,直到Consumer触发Ack或者Consumer断开。
- Ack消息:标记已消费,位点前进。
- Nack消息:底层操作等价于Ack,会根据配置转发到死信Exchange,否则丢弃。
- Requeue:消息放回队头,待下次投递。
从内部核心组件看消费模型
- Queue:负责存储原始消息数据,按序存储。
- RedeliveryTracker:负责记录Consumer端Requeue的消息,并触发重新投递,标识投递次数。
- Dispatcher:负责管理连接Queue的所有Consumer,负责消息的负载均衡、分发、进度管理等。
- Limiter:QoS限流器,基于Unack数限流,而不是QPS,呼应上方消息生命周期。
- Unack Tracker:跟踪当前Channel中已投递未Ack的消息。
从这张图可以获取那些信息?
- 一个Queue可以被不同Connection连接、被同一个Connection的不同channel连接。
- 一个Channel中可以启动两个Consumer连接同一个Queue。
- QoS限流作用域为Channel,即一个Channel中创建的多个Consumer享有相同的配额。
- 如果BasicQoS Global设置为true,那么同一个Channel中的Consumer用尽配额,该Channel下的所有Consumer全部阻塞,无法接收新消息。
- Unack追踪器也是Channel作用域,故一个Channel关闭,被该Channel独占的所有未Ack消息全部回收到Queue级别的跟踪器,进行全局重投递。
Basic指令集: https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos
Consumer Prefetch:
https://www.rabbitmq.com/consumer-prefetch.html
金融级用法
- 消息确认:发送反馈,给予Producer发送成功的确认。
- 备选Exchange:发送成功的消息无法匹配任何binding的场景。
- 消息回退:消息无法匹配任何Binding时退回到Producer。
- 重投递:网络错误、Consumer端宕机、业务处理偶发错误等场景,重试消费恢复。
- 死信Exchange:业务多次重试、长时间无法成功,放入死信,待人工处理或者下一步的自动化修正or告警系统。
功能点对比
经过上述说明,你应该能利用RabbitMQ的功能点,结合自己的业务场景组织一个相对合理的生产消费拓扑。除了上面提到的功能点,RabbitMQ本身还提供了很多其他功能,下面主要列举一部分对比,可供参考和借鉴。
通道类
功能点 | 说明 | TDMQ支持情况 |
---|---|---|
认证和授权 | 基于User/Password的登录鉴权机制。 | 整合pulsar自身的JWT(role token)机制进行对齐 |
连接协商机制 | 连接握手协商连接通信参数。 | 完全对齐RabbitMQ原生 |
认证和授权 | Vhost维度配置和User的权限关系。 | AMQP SDK使用层面完全对齐 |
限流协商机制(QoS) | 基于Unack数进行配额限制。 | 完全对齐RabbitMQ原生 |
注意:QoS机制RabbitMQ的实现是和标准AMQP协议有出入的,我们选择对齐RabbitMQ而不是AMQP规范,我们也认为RabbitMQ的模式较合理,详见https://www.rabbitmq.com/consumer-prefetch.html
Exchange类
功能点 | 说明 | TDMQ支持情况 |
---|---|---|
Exchange绑定Exchange | RabbitMQ在AMQP协议上的扩展,使Exchange不局限于只绑定Queue,借此可以构建出更加复杂的拓扑逻辑基于User/Password的登录鉴权机制。 | 暂未支持,排期中 |
死信Exchange | Queue的扩展参数,用于Queue中丢弃消息时转发至死信Exchange。 | 完全对齐RabbitMQ原生 |
备选Exchange | Exchange的扩展参数,用于消息发送至Exchange时,无法匹配任何路由规则到下游Queue,转发至备选Exchange。 | 完全对齐RabbitMQ原生 |
Queue类
功能点 | 说明 | TDMQ支持情况 |
---|---|---|
优先队列 | 消息可设置优先级,同时到达的消息可根据优先级投递,是一种局部性破坏先入先出机制的功能。 | 暂未支持,排期中 |
独占队列 | 声明队列只能被声明的Connection实体所连接,通常和临时队列配合使用。 | 暂未支持,排期中 |
临时队列 | 随机生成一个临时队列名,可用于当前进程专用,通常配合独占队列和AutoDelete一起使用。 | 暂未支持,排期中 |
回复队列 | 用于声明消息Producer处理完成后,向Producer进行回包的队列,以此实现一问一答的通信模型。 | 暂未支持,排期中 |
TTL | 针对消息设置TTL(time to live),过期未投递的消息将会被丢弃 or 进入死信。 | 目前支持vhost级别的TTL机制 |
镜像队列 | RabbitMQ为了解决单点储存问题而引入的,为了实现队列消息多副本存储。 | TDMQ天然多副本分布式存储,不需要该功能 |
收发机制类
功能点 | 说明 | TDMQ支持情况 |
---|---|---|
消息确认 | 消息在Broker成功存储后,回包Producer,进行发送成功确认。 | 完全对齐RabbitMQ原生 |
事务消息 | 消息确认功能出现前的发送确认机制,性能很差,不建议使用。 | 暂未支持,待定 |
延迟消息 | 消息发送成功后,延迟一定时间后才进行投递。 | 完全对齐RabbitMQ原生 |
RPC | 基于回复队列封装出的一问一答模型,使用场景较少,建议用主流RPC框架。 | 暂未支持,待定 |
参考
- RabbitMQ协议官方文档:
https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
- RabbitMQ官方功能介绍:
https://www.rabbitmq.com/admin-guide.html
- RabbitMQ协议指令集:
https://www.rabbitmq.com/amqp-0-9-1-reference.html
- RabbitMQ官方教程:
https://www.rabbitmq.com/getstarted.html
- RabbitMQ官方Demo:
https://github.com/rabbitmq/rabbitmq-tutorials
后记
通过这篇文章,希望能对RabbitMQ进行一定程度的科普,也从一个从0到1设计一个RabbitMQ Broker的开发的角度,浅析了一些RabbitMQ的一些消费模型细节,补充点当前网络上对这部分细节的缺漏,希望可以起到一些启发作用。
后续,我们将会着重分享,如何在apache pulsar生态上构建出一套完全对齐RabbitMQ协议的高性能、高可用、云原生消息队列,相比原生RabbitMQ,我们有何优势,以及我们在过程中遇到的问题,产生的思考。
敬请期待~
扫描下方二维码关注本公众号,
了解更多微服务、消息队列的相关信息!
解锁超多鹅厂周边!