本次学习主要针对运维人员,和对rabbitmq不熟悉的开发人员。通过本次学习你将掌握rabbitmq 的基本原理、集群、基本运维操作、常见故障处理。
1、原理与概念
简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
解决的问题
RabbitMQ就是当前最主流的消息中间件之一。
- 两个(多个)系统间需要通过定时任务来同步某些数据
- 异构系统的不同进程间相互调用、通讯的问题
Queue
Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
技术术语
- Broker:简单来说就是消息队列服务器实体。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作权限分离,把不同的系统使用的rabbitmq区分开,共用一个消息队列服务器,但看上去就像各自在用不用的rabbitmq服务器一样。
- Connection:一个网络连接,比如TCP/IP套接字连接。
- channel:消息通道,是建立在真实的TCP连接内的虚拟连接(是我们与RabbitMQ打交道的最重要的一个接口)。仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的,需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。AMQP的命令都是通过信道发送出去的(我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。)。每条信道都会被指派一个唯一ID。在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务,理论上无限制,减少TCP创建和销毁的开销,实现共用TCP的效果。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。注1:一个生产者或一个消费者与MQ服务器之间只有一条TCP连接 注2:RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
- Exchange:消息交换机,生产者不是直接将消息投递到Queue中的,实际上是生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
- Exchange Types RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),之后会分别进行介绍。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
- Routing Key:路由关键字,生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
- 在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
- Prefetch count 前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
消息队列的使用过程
在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。所以生产者想要发送消息,首先必须要声明一个Exchange和该Exchange对应的Binding。
在Rabbit MQ中,声明一个Exchange需要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在创建Binding和生产者通过publish推送消息时需要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。
声明一个Binding需要提供一个QueueName,ExchangeName和BindingKey。
下面是消息发送的过程
- 建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。
- 建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
- 发送消息。由Producer发送消息到Broker中的Exchange中。
- 路由转发。生产者Producer在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange收到消息后可以看到消息中指定的RoutingKey,再根据当前Exchange的ExchangeType,按一定的规则将消息转发到相应的queue中去。
- 消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
- 消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
exchange 与 Queue 的路由机制
- exchange 将消息发送到哪一个queue是由exchange type 和bing 规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。Direct exchange 直接转发路由,其实现原理是通过消息中的routkey,与queue 中的routkey 进行比对,若二者匹配,则将消息发送到这个消息队列。通常使用这个。
- 以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
- Fanout exchange 复制分发路由,该路由不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列。
- 上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。topic exchange 通配路由,是direct exchange的通配符模式,消息中的routkey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配表达式的queue。
- 以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
需要注意的一点只有queue具有 保持消息的功能,exchange不能保存消息。
- headers headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。
durability 持久化与非持久化队列
- 如何识别?如上图,在Features字段里有一个D,就是持久化队列,英文durable(持久的)
- 持久化队列和非持久化队列的区别是什么?持久化队列会被保存在磁盘中,固定并持久的存储,当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理,而非持久化队列不会被保存在磁盘中,Rabbit服务重启后队列就会消失。
- 如何选择?如果需要队列的完整性,数据在队列中的保存是必须不允许丢失的,那么可以使用持久化。而当需要获取的信息是实时的,或者是随机的信息,不需要信息的精确性或完整性,但是追求获取性能,可以选择非持久化队列。
2、分布式集群架构和高可用性
设计集群的目的
- 允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行
- 通过增加更多的节点来扩展消息通信的吞吐量
集群配置方式
RabbitMQ可以通过三种方法来部署分布式集群系统,分别是:cluster,federation,shovel
- cluster:
- 不支持跨网段,用于同一个网段内的局域网
- 可以随意的动态增加或者减少
- 节点之间需要运行相同版本的RabbitMQ和Erlang
- federation:应用于广域网,允许单台服务器上的交换机或队列接收发布到另一台服务器上交换机或队列的消息,可以是单独机器或集群。federation队列类似于单向点对点连接,消息会在联盟队列之间转发任意次,直到被消费者接受。通常使用federation来连接internet上的中间服务器,用作订阅分发消息或工作队列。
- shovel:连接方式与federation的连接方式类似,但它工作在更低层次。可以应用于广域网
RabbitMQ cluster 集群同步原理
上面图中采用三个节点组成了一个RabbitMQ的集群,Exchange A的元数据信息在所有节点上是一致的,而Queue(存放消息的队列)的完整数据则只会存在于它所创建的那个节点上。,其他节点只知道这个queue的metadata信息和一个指向queue的owner node的指针。RabbitMQ集群元数据的同步
RabbitMQ集群会始终同步四种类型的内部元数据(类似索引):
- 队列元数据:队列名称和它的属性;
- 交换器元数据:交换器名称、类型和属性;
- 绑定元数据:一张简单的表格展示了如何将消息路由到队列;
- vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性;因此,当用户访问其中任何一个RabbitMQ节点时,通过rabbitmqctl查询到的queue/user/exchange/vhost等信息都是相同的。
为何RabbitMQ集群仅采用元数据同步的方式?
一,存储空间,如果每个集群节点都拥有所有Queue的完全数据拷贝,那么每个节点的存储空间会非常大,集群的消息积压能力会非常弱(无法通过集群节点的扩容提高消息积压能力);
二,性能,消息的发布者需要将消息复制到每一个集群节点,对于持久化消息,网络和磁盘同步复制的开销都会明显增加。
RabbitMQ cluster 集群的两种模式
- 普通模式:默认的集群模式。
- 镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案
普通模式:当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer,所以consumer应平均连接每一个节点,从中取消息。该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。如果做了队列持久化或消息持久化,那么得等A节点恢复,然后才可被消费,并且在A节点恢复之前其它节点不能再创建A节点已经创建过的持久队列;如果没有持久化的话,消息就会失丢。这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。为什么RabbitMQ不将队列复制到集群里每个节点呢?这与它的集群的设计本意相冲突,集群的设计目的就是增加更多节点时,能线性的增加性能(CPU、内存)和容量(内存、磁盘)。当然RabbitMQ新版本集群也支持队列复制(有个选项可以配置)。比如在有五个节点的集群里,可以指定某个队列的内容在2个节点上进行存储,从而在性能与高可用性之间取得一个平衡(应该就是指镜像模式)。
镜像模式:其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用.
节点类型
- RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和vhost的元数据定义存储在内存中,好处是可以使得像交换机和队列声明等操作更加的快速。
- Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启RabbitMQ的时候,丢失系统的配置信息。
如果是内存结点这里就显示为RAM注意
- RabbitMQ要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。
- 如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(包括创建队列、交换器、绑定,添加用户、更改权限、添加和删除集群结点),直到节点恢复。
- 解决方案:设置两个磁盘节点,至少有一个是可用的,可以保存元数据的更改。
Erlang Cookie
Erlang Cookie是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的Erlang Cookie。具体的目录存放在/var/lib/rabbitmq/.erlang.cookie。
3、基本操作
rabbitmq集群必要条件
绑定实体ip,即ifconfig所能查询到的绑定到网卡上的ip,以下是绑定方法
代码语言:javascript复制#编辑配置路径 /etc/rabbitmq/rabbitmq-env.conf
NODE_IP_ADDRESS=172.16.136.133
复制代码
配置域名映射到实体ip
代码语言:javascript复制#配置文件1所在路径 /etc/rabbitmq/rabbitmq.config (如果是集群,每台机器都需要修改这个绑定本机实体ip)
#其中rabbit@master是创建集群时所配置的参数,@后面的参数为主机名,示例中为master
[
{rabbit, [
{cluster_nodes, {['rabbit@master'], disc}},
{cluster_partition_handling, ignore},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true},
{exit_on_close, false},
{keepalive, true}]}
]},
{kernel, [
{inet_dist_listen_max, 44001},
{inet_dist_listen_min, 44001}
]}
].
复制代码
#配置文件2 所在路径 /etc/hosts (如果是集群,每台机器都需要修改这个绑定本机实体ip,而且hosts文件的映射不得重复,如果重复linux系统为以最下面一条记录为准)
172.16.136.133 master
172.16.136.134 venus
172.16.136.135 venus2
启动停止
代码语言:javascript复制停止
#机器A
service rabbitmq-server stop
epmd -kill
#机器B
service rabbitmq-server stop
epmd -kill
#机器C
service rabbitmq-server stop
epmd -kill
启动方式1
#机器A
service rabbitmq-server start
#机器B
service rabbitmq-server start
#机器C
service rabbitmq-server start
启动方式2
rabbitmq-server -detached
集群重启顺序
集群重启的顺序是固定的,并且是相反的。如下所述:
启动顺序:磁盘节点 => 内存节点 关闭顺序:内存节点 => 磁盘节点 最后关闭必须是磁盘节点,不然可能会造成集群启动失败、数据丢失等异常情况。
重建集群
注1:此处的mq集群重建是比较快速和有效的方法,面向的是初次安装或者可以接受mq中所存有的数据丢失的情况下,必须先有mq的.json后缀的配置文件或者有把握写入集群中exchange、queue等配置。
按顺序停止所有机器中的rabbitmq
代码语言:javascript复制#机器A
service rabbitmq-server stop
epmd -kill
#机器B
service rabbitmq-server stop
epmd -kill
#机器C
service rabbitmq-server stop
epmd -kill
移除rabbitmq配置记录与存储文件
代码语言:javascript复制#位于 /var/lib/rabbitmq/mensia
mv /var/lib/rabbitmq/mensia /var/lib/rabbitmq/mensia.bak
按顺序启动所有机器中的rabbitmq
代码语言:javascript复制#机器C
service rabbitmq-server start
#机器B
service rabbitmq-server start
#机器A
service rabbitmq-server start
停止被加入集群节点app
代码语言:javascript复制比如A、B、C三台机器,将B和C加入到A中去,需要执行以下命令
#机器B
rabbitmqctl stop_app
#机器C
rabbitmqctl stop_app
建立集群
代码语言:javascript复制注意此处master为唯一没有执行rabbitmqctl stop_app的机器
#机器B
rabbitmqctl join_cluster rabbit@master
#机器C
rabbitmqctl join_cluster rabbit@master
启动集群
#机器B
rabbitmqctl start_app
#机器C
rabbitmqctl start_app
检查集群状态
在任意一台机器上执行rabbitmqctl cluster_status命令即可检查,输出包含集群中的节点与运行中的节点,兼以主机名标志
添加集群配置
创建用户
代码语言:javascript复制例子中创建了两个用户 添加用户add_user,设置角色set_user_tags,添加rabbitmq虚拟主机add_vhost,设置访问权限set_permissions,以下是详细用法
# 创建第一个用户
/usr/sbin/rabbitmqctl add_user 用户名 密码
/usr/sbin/rabbitmqctl set_user_tags 用户名 administrator
/usr/sbin/rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
# 创建第二个用户
/usr/sbin/rabbitmqctl add_user 用户名2 密码
/usr/sbin/rabbitmqctl set_user_tags 用户名2 management
/usr/sbin/rabbitmqctl add_vhost sip_ext
/usr/sbin/rabbitmqctl set_permissions -p sip_ext 用户名2 '.*' '.*' '.*'
备注:RabbitMQ 虚拟主机,RabbitMQ 通过虚拟主机(vhost)来分发消息。拥有自己独立的权限控制,不同的vhost之间是隔离的,单独的。
权限控制的基本单位:vhost。
用户只能访问与之绑定的vhost。
vhost是AMQP中唯一无法通过协议来创建的基元。只能通过rabbitmqctl工具来创建。
打开15672网页管理端,访问mq
/usr/sbin/rabbitmq-plugins enable rabbitmq_management 备注:如果发现命令执行完毕没有打开此服务,15672端口没有监听,则是由于没有重启mq导致的
在底部导入.json后缀的配置文件即可
http://localhost:4000/first-blog/rabbitmq.jpg
如果覆盖了用户需要使用以下命令修改mq用户密码 /usr/sbin/rabbitmqctl change_password 用户名 密码
修改节点类型
代码语言:javascript复制rabbitmqctl stop_app
rabbitmqctl change_cluster_node_type dist
rabbitmqctl change_cluster_node_type ram
rabbitmqctl start_app
常用命令
4、常见故障
集群状态异常
- rabbitmqctl cluster_status检查集群健康状态,不正常节点重新加入集群
- 分析是否节点挂掉,手动启动节点。
- 保证网络连通正常
队列阻塞、数据堆积
- 保证网络连通正常
- 保证消费者正常消费,消费速度大于生产速度
- 保证服务器TCP连接限制合理
脑裂
- 按正确顺序重启集群
- 保证网络连通正常
- 保证磁盘空间、cpu、内存足够