RabbitMQ高可用集群方案
RabbitMQ的Cluster模式分为两种:
- 普通模式
- 镜像模式
Cluster普通模式:
元数据包含以下内容:
- 队列元数据:队列的名称及属性
- 交换器:交换器的名称及属性
- 绑定关系元数据:交换器与队列或者交换器与交换器
- vhost元数据:为vhost内的队列,交换器和绑定提供命名空间及安全属性之间的绑定关系
Cluster多机多节点部署:多机多节点是指在每台机器中部署一个RabbitMQ服务节点,进而由多个机器组成一个RabbitMQ集群
Cluster单机多节点部署:由于某些因素的限制,有时候不得不在单台物理机器上去创建一个多RabbitMQ服务节点的集群。或者只想要实验性的验证集群的某些特性,也不需要浪费过多的物理机器去实现。需要为每个RabbitMQ服务节点设置不同的端口号和节点名称来启动相应的服务
Cluster镜像模式:
镜像模式的集群是在普通模式的基础上,通过policy来实现,使用镜像模式可以实现RabbitMQ的高可用方案
ha-sync-mode 队列中消息的同步方式,有效值为automatic和manual,默认为automatic
RabbitMQ集群搭建
- 环境准备
准备3台机器,并在这三台机器上安装RabbitMQ
代码语言:javascript复制192.168.0.22 node1
192.168.0.23 node2
192.168.0.24 node3
- 修改配置文件
依次修改对应主机的hostname
代码语言:javascript复制hostname node1
hostname node2
hostname node3
依次修改主机的/etc/hosts文件,添加以下内容
代码语言:javascript复制192.168.0.22 node1
192.168.0.23 node2
192.168.0.24 node3
将node1节点上的 /var/lib/rabbitmq/.erlang.cookie 文件复制到其他节点(Erlang语言要求必须有相同的cookie才能进行集群通信)
代码语言:javascript复制scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
- 添加防火墙端口
给每台机器的防火墙添加端口
代码语言:javascript复制firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重启防火墙
代码语言:javascript复制firewall-cmd --reload
- 启动RabbitMQ集群
启动每台机器的RabbitMQ
代码语言:javascript复制systemctl start rabbitmq-server
操作节点node2,将node2加入到集群
代码语言:javascript复制rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1 --ram
rabbitmqctl start_app
查看集群的状态
代码语言:javascript复制rabbitmqctl cluster_status
在node3节点上重复上述命令,将node3加入到集群。完成后查看集群状态
RabbitMQ Web端查看集群状态
同时在Web端还可以看到每个节点的详细信息,如内存情况,IO情况,数据的存储等等
镜像队列模式集群
镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments 通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群 开启镜像队列模式需要在管理页面添加策略,添加方式: 进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy 在表单中填入:
参数说明:
name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息
Apply to:策略应用到什么对象上
Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式)
Priority:优先级,数值越大,优先级越高,相同优先级取最后一个
Definition:策略定义的内容,对于镜像队列的配置来说,只需要包含3个部分: ha-mode
、ha-params
和 ha-sync-mode
。其中,ha-sync-mode
是同步的方式,自动还是手动,默认是自动。ha-mode
和 ha-params
组合使用。组合方式如下:
镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式。但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。
集群测试代码示例:
Producer示例:
代码语言:javascript复制import com.rabbitmq.client.*;import java.io.IOException;import java.time.Instant;import java.util.concurrent.TimeUnit;public class Producer { public static void main(String[] args) { // 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置连接属性
connectionFactory.setUsername("test-user");
connectionFactory.setPassword("test-user");
connectionFactory.setVirtualHost("v1"); // 设置每个节点的链接地址和端口
Address[] addresses = new Address[] { new Address("192.168.0.22", 5672), new Address("192.168.0.23", 5672), new Address("192.168.0.24", 5672)
};
Connection connection = null;
Channel channel = null; try { // 开启/关闭连接自动恢复,默认是开启状态
connectionFactory.setAutomaticRecoveryEnabled(true); // 设置每100毫秒尝试恢复一次,默认是5秒
connectionFactory.setNetworkRecoveryInterval(100);
connectionFactory.setTopologyRecoveryEnabled(false); // 从使用连接集合里面的地址获取连接
connection = connectionFactory.newConnection(addresses, "Producer"); // 添加重连监听器
((Recoverable) connection)
.addRecoveryListener( new RecoveryListener() { // 重连成功后的回调
public void handleRecovery(Recoverable recoverable) {
System.out.println(Instant.now().toString() " 已重新建立连接");
} // 开始重连时的回调
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println(Instant.now().toString() " 开始尝试重连");
}
}); // 从链接中创建通道
channel = connection.createChannel(); // 声明队列,如果队列不存在,会创建
// RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
channel.queueDeclare("queue-test", true, false, false, null); for (int i = 0; i < 100; i ) {
String message = "Hello Rabbit " i; try { // 发送消息
channel.basicPublish("", "queue-test", null, message.getBytes());
} catch (Exception e) { // 可能连接已关闭,等待重连
System.out.println("消息 " message " 发送失败!");
i--;
TimeUnit.SECONDS.sleep(2); continue;
}
System.out.println("消息" i "已发送!");
TimeUnit.SECONDS.sleep(2);
}
} catch (Exception e) {
e.printStackTrace();
} finally { // 关闭通道
if (channel != null && channel.isOpen()) { try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
} // 关闭连接
if (connection != null && connection.isOpen()) { try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Consumer示例:
代码语言:javascript复制import com.rabbitmq.client.*;import java.io.IOException;import java.time.Instant;public class Consumer { public static void main(String[] args) { // 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置连接属性
connectionFactory.setUsername("test-user");
connectionFactory.setPassword("test-user");
connectionFactory.setVirtualHost("v1"); // 设置每个节点的链接地址和端口
Address[] addresses = new Address[] { new Address("192.168.0.22", 5672), new Address("192.168.0.23", 5672), new Address("192.168.0.24", 5672)
};
Connection connection = null;
Channel channel = null; try { // 开启/关闭连接自动恢复,默认是开启状态
connectionFactory.setAutomaticRecoveryEnabled(true); // 设置每100毫秒尝试恢复一次,默认是5秒
connectionFactory.setNetworkRecoveryInterval(100); // 从连接工厂获取连接
connection = connectionFactory.newConnection(addresses, "Consumer"); // 添加重连监听器
((Recoverable) connection)
.addRecoveryListener( new RecoveryListener() { // 重连成功后的回调
public void handleRecovery(Recoverable recoverable) {
System.out.println(Instant.now().toString() " 已重新建立连接");
} // 开始重连时的回调
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println(Instant.now().toString() " 开始尝试重连");
}
}); // 从链接中创建通道
channel = connection.createChannel(); // 声明队列,如果队列不存在,会创建
// RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
channel.queueDeclare("queue-test", true, false, false, null); // 定义收到消息后的回调
final Channel finalChannel = channel;
DeliverCallback deliverCallback = new DeliverCallback() {
@Override public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" new String(message.getBody(), "UTF-8"));
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}; // 监听队列
channel.basicConsume( "queue-test", false,
deliverCallback, new CancelCallback() {
@Override public void handle(String consumerTag) throws IOException {}
});
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally { // 关闭通道
if (channel != null && channel.isOpen()) { try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
} // 关闭连接
if (connection != null && connection.isOpen()) { try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
RabbitMQ常用管理命令
rabbitmqctl status 查看节点状态
rabbitmqctl stop [pid_file] 停止运行的RabbitMQ的Erlang虚拟机和RabbitMQ服务应用 如果指定了pid_file,还需要等待指定进程的结束。pid_file是通过调用rabbitmq-server命令启动RabbitMQ服务时创建的,默认情况下存放于Mnesia目录中。 如果使用rabbitmq-server -detach这个带有-detach后缀的命令来启动RabbitMQ服务则不会生成pid_file文件。
rabbitmqctl stop_app 停止RabbitMQ服务应用,但是Erlang虚拟机还是处于运行状态 此命令的执行优先于其他管理操作(这些操作需要先停止RabbitMQ应用,如rabbitmqctl reset)
rabbitmqctl start_app 启动RabbitMQ应用,此命令典型的用途就是执行了其他管理操作之后,重新启动之前停止的RabbitMQ应用。
rabbitmqctl reset 将RabbitMQ节点重置还原到最初状态 包括从原来的集群中删除此节点,从管理数据库中删除所有的配置数据,如已配置的用户,vhost等,以及删除所有的持久化数据 执行rabbitmqctl reset 命令前必须停止RabbitMQ应用
rabbitmqctl force_reset 强制将RabbitMQ节点重置还原到最初状态。此命令不论当前管理数据库的状态和集群配置是什么,都会无条件的重置节点,只能在数据库或集群配置已损坏的情况下使用
rabbitmqctl [-n nodename] join_cluster {cluster_node} [—ram] 将节点加入指定的集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。 -n nodename:指定需要操作的目标节点,如rabbit@node1 cluster_node:需要加入的集群节点名,如rabbit@node2 —ram:集群节点类型,有ram,disc两种,默认为disc
- ram 内存节点,所有元数据都存储在内存中
- disc 磁盘节点,所有元数据都存储在磁盘中
rabbitmqctl cluster_status 查看集群状态
rabbitmqctl change_cluster_node_type {disc|ram} 修改集群节点的类型,使用此命令前要停止RabbitMQ应用
rabbitmqctl forget_cluster_node [—offline] 将节点从集群中删除,允许离线执行
rabbitmqctl update_cluster_nodes {clusternode} 在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群
rabbitmqctl force_boot 确保节点可以启动,即使它不是最后一个关闭的节点
rabbitmqctl set_cluster_name {name} 设置集群名称。集群名称在客户端连接时会通报给客户端 集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置
Federation插件
Federation插件的设计目标是使RabbitMQ在不同Broker节点之间进行消息传递而无需建立集群,该功能在以下场景下非常有用:
- 各个节点运行在不同版本的Erlang和RabbitMQ上
- 网络环境不稳定,如广域网当中
Federation的作用:
Shovel插件
Shovel与Federation具备的数据转发功能类似。Shovel能够可靠,持续的从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker的交换器(作为目的端,即destination)
Shovel的主要优势: 松耦合,shovel可以移动位于不同管理域中的Broker或者集群上的消息,这些Broker或者集群可以包含不同的用户和vhost,也可以使用不同的RabbitMQ和Erlang版本 支持广域网,Shovel插件同样基于AMQP协议在Broker之间进行通信,被设计成可以容忍时断时续的连通情形,并且能够保证消息的可靠性 高度定制,当Shovel成功连接后,可以对其进行配置以执行相关的AMQP命令