RabbitMQ集群和高可用方案

2020-05-21 12:08:41 浏览数 (1)

RabbitMQ高可用集群方案

RabbitMQ的Cluster模式分为两种:

  • 普通模式
  • 镜像模式

Cluster普通模式:

元数据包含以下内容:

  • 队列元数据:队列的名称及属性
  • 交换器:交换器的名称及属性
  • 绑定关系元数据:交换器与队列或者交换器与交换器
  • vhost元数据:为vhost内的队列,交换器和绑定提供命名空间及安全属性之间的绑定关系

Cluster多机多节点部署:多机多节点是指在每台机器中部署一个RabbitMQ服务节点,进而由多个机器组成一个RabbitMQ集群

Cluster单机多节点部署:由于某些因素的限制,有时候不得不在单台物理机器上去创建一个多RabbitMQ服务节点的集群。或者只想要实验性的验证集群的某些特性,也不需要浪费过多的物理机器去实现。需要为每个RabbitMQ服务节点设置不同的端口号和节点名称来启动相应的服务

Cluster镜像模式:

镜像模式的集群是在普通模式的基础上,通过policy来实现,使用镜像模式可以实现RabbitMQ的高可用方案

ha-sync-mode 队列中消息的同步方式,有效值为automatic和manual,默认为automatic

RabbitMQ集群搭建

  1. 环境准备

准备3台机器,并在这三台机器上安装RabbitMQ

代码语言:javascript复制
192.168.0.22 node1
192.168.0.23 node2
192.168.0.24 node3
  1. 修改配置文件

依次修改对应主机的hostname

代码语言:javascript复制
hostname node1
hostname node2
hostname node3

依次修改主机的/etc/hosts文件,添加以下内容

代码语言:javascript复制
192.168.0.22 node1
192.168.0.23 node2
192.168.0.24 node3

将node1节点上的 /var/lib/rabbitmq/.erlang.cookie 文件复制到其他节点(Erlang语言要求必须有相同的cookie才能进行集群通信)

代码语言:javascript复制
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
  1. 添加防火墙端口

给每台机器的防火墙添加端口

代码语言:javascript复制
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent

重启防火墙

代码语言:javascript复制
firewall-cmd --reload
  1. 启动RabbitMQ集群

启动每台机器的RabbitMQ

代码语言:javascript复制
systemctl start rabbitmq-server

操作节点node2,将node2加入到集群

代码语言:javascript复制
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1 --ram
rabbitmqctl start_app

查看集群的状态

代码语言:javascript复制
rabbitmqctl cluster_status

在node3节点上重复上述命令,将node3加入到集群。完成后查看集群状态

RabbitMQ Web端查看集群状态

同时在Web端还可以看到每个节点的详细信息,如内存情况,IO情况,数据的存储等等

镜像队列模式集群

镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments 通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群 开启镜像队列模式需要在管理页面添加策略,添加方式: 进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy 在表单中填入:

参数说明: name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息 Apply to:策略应用到什么对象上 Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式) Priority:优先级,数值越大,优先级越高,相同优先级取最后一个 Definition:策略定义的内容,对于镜像队列的配置来说,只需要包含3个部分: ha-modeha-paramsha-sync-mode。其中,ha-sync-mode是同步的方式,自动还是手动,默认是自动。ha-modeha-params 组合使用。组合方式如下:

镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式。但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。

集群测试代码示例:

Producer示例:

代码语言:javascript复制
import com.rabbitmq.client.*;import java.io.IOException;import java.time.Instant;import java.util.concurrent.TimeUnit;public class Producer {  public static void main(String[] args) {    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();    // 设置连接属性
    connectionFactory.setUsername("test-user");
    connectionFactory.setPassword("test-user");
    connectionFactory.setVirtualHost("v1");    // 设置每个节点的链接地址和端口
    Address[] addresses =        new Address[] {          new Address("192.168.0.22", 5672),          new Address("192.168.0.23", 5672),          new Address("192.168.0.24", 5672)
        };

    Connection connection = null;
    Channel channel = null;    try {      // 开启/关闭连接自动恢复,默认是开启状态
      connectionFactory.setAutomaticRecoveryEnabled(true);      // 设置每100毫秒尝试恢复一次,默认是5秒
      connectionFactory.setNetworkRecoveryInterval(100);

      connectionFactory.setTopologyRecoveryEnabled(false);      // 从使用连接集合里面的地址获取连接
 connection = connectionFactory.newConnection(addresses, "Producer");      // 添加重连监听器
      ((Recoverable) connection)
          .addRecoveryListener(              new RecoveryListener() {                // 重连成功后的回调
                public void handleRecovery(Recoverable recoverable) {
                  System.out.println(Instant.now().toString()   " 已重新建立连接");
                }                // 开始重连时的回调
                public void handleRecoveryStarted(Recoverable recoverable) {
                  System.out.println(Instant.now().toString()   " 开始尝试重连");
                }
              });      // 从链接中创建通道
      channel = connection.createChannel();      // 声明队列,如果队列不存在,会创建
      // RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
      channel.queueDeclare("queue-test", true, false, false, null);      for (int i = 0; i < 100; i  ) {
        String message = "Hello Rabbit "   i;        try {          // 发送消息
          channel.basicPublish("", "queue-test", null, message.getBytes());

        } catch (Exception e) {          // 可能连接已关闭,等待重连
          System.out.println("消息 "   message   " 发送失败!");
          i--;
          TimeUnit.SECONDS.sleep(2);          continue;
        }

        System.out.println("消息"   i   "已发送!");
        TimeUnit.SECONDS.sleep(2);
      }

    } catch (Exception e) {
      e.printStackTrace();
    } finally {      // 关闭通道
      if (channel != null && channel.isOpen()) {        try {
          channel.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }      // 关闭连接
      if (connection != null && connection.isOpen()) {        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

Consumer示例:

代码语言:javascript复制
import com.rabbitmq.client.*;import java.io.IOException;import java.time.Instant;public class Consumer {  public static void main(String[] args) {    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();    // 设置连接属性
    connectionFactory.setUsername("test-user");
    connectionFactory.setPassword("test-user");
    connectionFactory.setVirtualHost("v1");    // 设置每个节点的链接地址和端口
    Address[] addresses =        new Address[] {          new Address("192.168.0.22", 5672),          new Address("192.168.0.23", 5672),          new Address("192.168.0.24", 5672)
        };

    Connection connection = null;
    Channel channel = null;    try {      // 开启/关闭连接自动恢复,默认是开启状态
      connectionFactory.setAutomaticRecoveryEnabled(true);      // 设置每100毫秒尝试恢复一次,默认是5秒
      connectionFactory.setNetworkRecoveryInterval(100);      // 从连接工厂获取连接
 connection = connectionFactory.newConnection(addresses, "Consumer");      // 添加重连监听器
      ((Recoverable) connection)
          .addRecoveryListener(              new RecoveryListener() {                // 重连成功后的回调
                public void handleRecovery(Recoverable recoverable) {
                  System.out.println(Instant.now().toString()   " 已重新建立连接");
                }                // 开始重连时的回调
                public void handleRecoveryStarted(Recoverable recoverable) {
                  System.out.println(Instant.now().toString()   " 开始尝试重连");
                }
              });      // 从链接中创建通道
      channel = connection.createChannel();      // 声明队列,如果队列不存在,会创建
      // RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
      channel.queueDeclare("queue-test", true, false, false, null);      // 定义收到消息后的回调
      final Channel finalChannel = channel;
      DeliverCallback deliverCallback =          new DeliverCallback() {

            @Override            public void handle(String consumerTag, Delivery message) throws IOException {
              System.out.println("收到消息:"   new String(message.getBody(), "UTF-8"));
              finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
          };      // 监听队列
      channel.basicConsume(          "queue-test",          false,
          deliverCallback,          new CancelCallback() {
            @Override            public void handle(String consumerTag) throws IOException {}
          });

      System.out.println("开始接收消息");
      System.in.read();

    } catch (Exception e) {
      e.printStackTrace();
    } finally {      // 关闭通道
      if (channel != null && channel.isOpen()) {        try {
          channel.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }      // 关闭连接
      if (connection != null && connection.isOpen()) {        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

RabbitMQ常用管理命令

rabbitmqctl status 查看节点状态

rabbitmqctl stop [pid_file] 停止运行的RabbitMQ的Erlang虚拟机和RabbitMQ服务应用 如果指定了pid_file,还需要等待指定进程的结束。pid_file是通过调用rabbitmq-server命令启动RabbitMQ服务时创建的,默认情况下存放于Mnesia目录中。 如果使用rabbitmq-server -detach这个带有-detach后缀的命令来启动RabbitMQ服务则不会生成pid_file文件。

rabbitmqctl stop_app 停止RabbitMQ服务应用,但是Erlang虚拟机还是处于运行状态 此命令的执行优先于其他管理操作(这些操作需要先停止RabbitMQ应用,如rabbitmqctl reset)

rabbitmqctl start_app 启动RabbitMQ应用,此命令典型的用途就是执行了其他管理操作之后,重新启动之前停止的RabbitMQ应用。

rabbitmqctl reset 将RabbitMQ节点重置还原到最初状态 包括从原来的集群中删除此节点,从管理数据库中删除所有的配置数据,如已配置的用户,vhost等,以及删除所有的持久化数据 执行rabbitmqctl reset 命令前必须停止RabbitMQ应用

rabbitmqctl force_reset 强制将RabbitMQ节点重置还原到最初状态。此命令不论当前管理数据库的状态和集群配置是什么,都会无条件的重置节点,只能在数据库或集群配置已损坏的情况下使用

rabbitmqctl [-n nodename] join_cluster {cluster_node} [—ram] 将节点加入指定的集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。 -n nodename:指定需要操作的目标节点,如rabbit@node1 cluster_node:需要加入的集群节点名,如rabbit@node2 —ram:集群节点类型,有ram,disc两种,默认为disc

  • ram 内存节点,所有元数据都存储在内存中
  • disc 磁盘节点,所有元数据都存储在磁盘中

rabbitmqctl cluster_status 查看集群状态

rabbitmqctl change_cluster_node_type {disc|ram} 修改集群节点的类型,使用此命令前要停止RabbitMQ应用

rabbitmqctl forget_cluster_node [—offline] 将节点从集群中删除,允许离线执行

rabbitmqctl update_cluster_nodes {clusternode} 在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群

rabbitmqctl force_boot 确保节点可以启动,即使它不是最后一个关闭的节点

rabbitmqctl set_cluster_name {name} 设置集群名称。集群名称在客户端连接时会通报给客户端 集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置

Federation插件

Federation插件的设计目标是使RabbitMQ在不同Broker节点之间进行消息传递而无需建立集群,该功能在以下场景下非常有用:

  • 各个节点运行在不同版本的Erlang和RabbitMQ上
  • 网络环境不稳定,如广域网当中

Federation的作用:

Shovel插件

Shovel与Federation具备的数据转发功能类似。Shovel能够可靠,持续的从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker的交换器(作为目的端,即destination)

Shovel的主要优势: 松耦合,shovel可以移动位于不同管理域中的Broker或者集群上的消息,这些Broker或者集群可以包含不同的用户和vhost,也可以使用不同的RabbitMQ和Erlang版本 支持广域网,Shovel插件同样基于AMQP协议在Broker之间进行通信,被设计成可以容忍时断时续的连通情形,并且能够保证消息的可靠性 高度定制,当Shovel成功连接后,可以对其进行配置以执行相关的AMQP命令

Federation/Shovel与Cluster的区别与联系

0 人点赞