RabbitMQ简介
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。 排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
什么是RabbitMQ?
RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为 面向消息的中间件)。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言.
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
RabbitMQ应用场景
代码语言:javascript复制RabbitMQ使用场景:
# 1)异步处理;
# 2)应用解耦;
# 3)流量削峰;
# 4)消息通讯;
RabbitMQ中概念
消息模型概述
所有MQ产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中
上面只是最简单的描述,具体到RabbitMQ则有更详细的概念解释,上面介绍过RabbitMQ是AMQP协议的一个开源实现,所以其内部实际上也是AMQP中的基本概念:
- Message消息, 消息是不具名的,他由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
路由键:
routing_key,是一条特定的规则,决定了消息将要发送到那个队列,每条消息在发布的时间都需要指定自己的routing_key
RabbitMQ 通过路由键实现了队列和交换器之间的绑定.
- Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- Exchange: 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- Binding: 绑定, 用于消息队列和交换器之间的关联,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则, 所以可以将交换器理解成一个由绑定构成的路由表。
- Queue: 消息队列,用来保存消息直到发送给消费者,他是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Connection: 网络连接,比如一个TCP连接。
- Channel信道: 多路复用连接中的一条独立的双向数据流通道。通道是建立在真实的TCP连接内地虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接受消息,这些动作都是通过信道完成,因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
- Consumer: 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
- Virtual Host: 虚拟主机, 表示一批交换器、消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器械。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
- Broker: 表示消息队列服务器实体。
一条消息的一生
如果看完上面概述,看这里还有点吃力,可以看一下下面详细描述
当生产者发布一条消息时,首先跟RabbitMQ建立连接(channel),通过该连接将想要发布的消息发送到交换器(exchange)上。交换器通过特定的路由规则(routing_key),将消息发送到某个队列(queue)。RabbitMQ会监控该队列,一旦发现有消费者订阅了该队列,就将消息发送给消费者进行处理,然后将该消息从队列中删除。
需要注意的是,这里提到的生产者和消费者只是消息发送和接受的概念体现,每个客户端都可以是消费者或生产者。
Channel(信道)
如果项目需要发布消息,那么必须要链接到 RabbitMQ,而项目于 RabbitMQ之间使用 TCP 连接,加入每次发布消息都要连接TCP,这不仅会造成连接资源严重浪费,会造成服务器性能瓶颈,所以 RabbitMQ 为所有的线程只用一条 TCP 连接,怎么实现的呢? RabbitMQ 引入了信道的概念,所有需要发布消息的线程都包装成一条信道在 TCP 中传输,理论上 一条 TCP 连接支持无限多个信道,模型如下:
Queue(队列)
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
接受消息
消费者主要通过两种方式从队列中接受消息: 使用basic.consume和basic.get命令 当消费者使用basic.consume订阅了某个队列后,一旦有消息到达该队列,RabbitMQ就将消息立即发送给消费者,然后等待下一条消息的到来。 如果消费者使用的是basic.get命令,只会从队列中获取单条消息,无法持续获取。假如队列中堆积了5条消息,使用basic.get命令只会获得最开始的那条消息,后面的4条消息无法获取。 如果一个队列有多个消费者进行订阅,RabbitMQ采用轮训的方式将消息发送给某个消费者,每条消息只发送给一个消费者。 也就是说,如果消费者A、B、C订阅了同一个队列,那么第一条消息会发送给A,第二条会发送给B,第三条发送给C,第四条发送给A,..,以此类推。 当消息被消费者消费了之后,RabbitMQ就将改消息从队列中删除。 那么 RabbitMQ 怎么知道消息被消费者成功消费了呢?这就涉及到了消息的确认机制。
消息确认
消费者接收到的每条消息都必须进行确认,如果消费者没有对消息进行确认,那么 RabbitMQ 不会将下一条消息发送给该消费者,直到其对消息进行了确认。如果在消费者向 RabbitMQ 发送确认之前,消费者与 RabbitMQ 之间的连接断开了,那么 RabbitMQ 会将该消息发送给其他的消费者。 . 主要有两种确认方式: 使用basic.ack命令向RabbitMQ发送确认,或者在订阅队列时将auto_ack参数设置为true . 需要注意的是,如果设置了 auto_ack 为 true,那么一旦消费者接收到了消息,RabbitMQ 就认为确认了消息,从而将消息从队列中删除。但是消费者接收到消息并不等同于成功处理了消息,如果在成功处理该条消息之前出现问题或者程序崩溃,由于此时 RabbitMQ 已经将消息从队列中删除了,那么就意味着这条消息丢失了。
消息持久化
默认情况下,如果RabbitMQ进行了重启,那么队列,交换器和其中的消息都会丢失,如果想要你的数据在重启后不丢失,那么就需要对消息进行持久化设置,主要操作如下:
- 将消息的投递模式(delivery mode)设置为 2(持久)。
- 将消息发送到持久化的交换器。
- 消息必须到达持久化的队列。
RabbitMQ 是通过将消息写入磁盘中的持久化日志中的方式实现消息的持久化的。如果持久化队列中的某条消息被消费了,那么 RabbitMQ 会在持久化日志中将该消息标记为等待垃圾收集。
Binding(绑定)
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表,如下:
Exchange(交换器)类型
我们向 RabbitMQ 发送消息,实际上是把消息发到交换器了,再由交换器根据相关路由规则发到特定队列上,在队列上监听的消费者就可以进行消费了,目前 RabbitMQ 共四种类型: direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, 此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
fanout交换器
每个发送到 fanout 交换器中的消息,他不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。对应到系统上,它允许你针对一个消息作不同操作,比如用户上传了一张新的图片,系统要同时对这个事件进行不同的操作,比如删除旧的图片缓存、增加积分奖励等等。这样就大大降低了系统之间的耦合度了。
topic交换器
topic 交换器有点类似于 direct 交换器,它通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
管理RabbitMQ
代码语言:javascript复制前面的部分介绍了一些 RabbitMQ 中比较重要的概念和消息的相关知识,接下来介绍如何对 RabbitMQ 进行管理。 首先需要明确一个概念,通常提到的 RabbitMQ 节点,实际上指的是 RabbitMQ 应用和所在的 Erlang 节点。RabbitMQ 是 Erlang 应用程序的一种。 启动 RabbitMQ 通常使用
rabbitmq-server
工具,但需要注意的是,使用该命令启动的包括 Erlang 节点和 RabbitMQ 应用。同时,还把 RabbitMQ 应用设置成了独立运行模式。 对于 RabbitMQ 应用的管理,通常使用rabbitmqctl
工具:
stop 参数:
# 将本地节点干净的关闭,包括 RabbitMQ 应用和 Erlang 节点。
# 同时,可以使用 `-n rabbit@hostname` 参数,关闭指定的远程节点。
stop_app 参数:
# 只关闭 RabbitMQ 应用。
start_app 参数:
# 只启动 RabbitMQ 应用。
RabbitMQ集群原理
代码语言:javascript复制# 在项目中想要 RabbitMQ 变得更加健壮,就要使得其变成高可用,所以我们要搭建一个 RabbitMQ 集群,这样你可以从任何一台 RabbitMQ 故障中得以幸免,并且应用程序能够持续运行而不会发生阻塞。而 RabbitMQ 本身是基于 Erlang 编写的,Erlang 天生支持分布式(通过同步 Erlang 集群各节点的 cookie 来实现),因此不需要像 ActiveMQ、Kafka 那样通过 ZooKeeper 分别来实现 HA 方案和保存集群的元数据.
元数据
RabbitMQ 内部有各种基础构件,包括队列、交换器、绑定、虚拟主机等,他们组成了 AMQP 协议消息通信的基础,而这些构件以元数据的形式存在,它始终记录在 RabbitMQ 内部,它们分别是:
# 队列元数据:队列名称和它们的属性
# 交换器元数据:交换器名称、类型和属性
# 绑定元数据:一张简单的表格展示了如何将消息路由到队列
# vhost元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性
集群中的队列
这里有个问题需要思考,RabbitMQ 默认会将消息冗余到所有节点上吗?这样听起来正符合高可用的特性,只要集群上还有一个节点存活,那么就可以继续进行消息通信,但这也随之为 RabbitMQ 带来了致命的缺点:
- 每次发布消息,都要把它扩散到所有节点上,而且对于磁盘节点来说,每一条消息都会触发磁盘活动,这会导致整个集群内性能负载急剧拉升。
- 如果每个节点都有所有队列的完整内容,那么添加节点不会给你带来额外的存储空间,也会带来木桶效应,举个例子,如果集群内有个节点存储了 3G 队列内容,那么在另外一个只有 1G 存储空间的节点上,就会造成内存空间不足的情况,也就是无法通过集群节点的扩容提高消息积压能力。
解决这个问题就是通过集群中唯一节点来负责任何特定队列,只有该节点才会受队列大小的影响,其它节点如果接收到该队列消息,那么就要根据元数据信息,传递给队列所有者节点(也就是说其它节点上只存储了特定队列所有者节点的指针)。这样一来,就可以通过在集群内增加节点,存储更多的队列数据.
分布交换器
交换器其实是我们想象出来的,它本质是一张查询表,里面包括了交换器名称和一个队列的绑定列表,当你将消息发布到交换器中,实际上是你所在的信道将消息上的路由键与交换器的绑定列表进行匹配,然后将消息路由出去。有了这个机制,那么在所有节点上传递交换器消息将简单很多,而 RabbitMQ 所做的事情就是把交换器拷贝到所有节点上,因此每个节点上的每条信道都可以访问完整的交换器了。
内存节点与磁盘节点
关于上面队列所说的问题与解决办法,又有了一个伴随而来的问题出现:如果特定队列的所有者节点发生了故障,那么该节点上的队列和关联的绑定都会消失吗?
- 如果是内存节点,那么附加在该节点上的队列和其关联的绑定都会丢失,并且消费者可以重新连接集群并重新创建队列;
- 如果是磁盘节点,重新恢复故障后,该队列又可以进行传输数据了,并且在恢复故障磁盘节点之前,不能在其它节点上让消费者重新连到集群并重新创建队列,如果消费者继续在其它节点上声明该队列,会得到一个 404 NOT_FOUND 错误,这样确保了当故障节点恢复后加入集群,该节点上的队列消息不回丢失,也避免了队列会在一个节点以上出现冗余的问题。
代码语言:javascript复制接下来说说内存节点与磁盘节点在集群中的作用,在集群中的每个节点,要么是内存节点,要么是磁盘节点,如果是内存节点,会将所有的元数据信息仅存储到内存中,而磁盘节点则不仅会将所有元数据存储到内存上, 还会将其持久化到磁盘。 在单节点 RabbitMQ 上,仅允许该节点是磁盘节点,这样确保了节点发生故障或重启节点之后,所有关于系统的配置与元数据信息都会重磁盘上恢复;而在 RabbitMQ 集群上,允许节点上至少有一个磁盘节点,在内存节点上,意味着队列和交换器声明之类的操作会更加快速。原因是这些操作会将其元数据同步到所有节点上,对于内存节点,将需要同步的元数据写进内存就行了,但对于磁盘节点,意味着还需要及其消耗性能的磁盘写入操作。 RabbitMQ 集群只要求至少有一个磁盘节点,这是有道理的,当其它内存节点发生故障或离开集群,只需要通知至少一个磁盘节点进行元数据的更新,如果是碰巧唯一的磁盘节点也发生故障了,集群可以继续路由消息,但是不可以做以下操作了:
# 创建队列
# 创建交换器
# 创建绑定
# 添加用户
# 更改权限
# 添加或删除集群节点
这是因为上述操作都需要持久化到磁盘节点上,以便内存节点恢复故障可以从磁盘节点上恢复元数据,解决办法是在集群添加 2 台以上的磁盘节点,这样其中一台发生故障了,集群仍然可以保持运行,且能够在任何时候保存元数据变更。
RabbitMQ集群部署
List
代码语言:javascript复制CentOS7.3.1611
Package:
rabbitmq-server-3.3.5-34
节点名 | IP | 软件版本 | 硬件 | 网络 | 说明 |
---|---|---|---|---|---|
rabbitmq-1 | 192.168.171.135 | list 里面 | 2C4G | Nat,内网 | 测试环境 |
rabbitmq-2 | 192.168.171.134 | list里面 | 2C4G | Nat,内网 | 测试环境 |
准备系统环境
代码语言:javascript复制init_security() {
systemctl stop firewalld
systemctl disable firewalld &>/dev/null
setenforce 0
sed -i '/^SELINUX=/ s/enforcing/disabled/' /etc/selinux/config
sed -i '/^GSSAPIAu/ s/yes/no/' /etc/ssh/sshd_config
sed -i '/^#UseDNS/ {s/^#//;s/yes/no/}' /etc/ssh/sshd_config
systemctl enable sshd crond &> /dev/null
rpm -e postfix --nodeps
echo -e "