分布式--RabbitMQ集成SpringBoot、消息可靠性、死信队列、延迟交换机、集群

2022-09-19 15:39:04 浏览数 (1)

接着上篇分布式--RabbitMQ入门

一、SpringBoot中使用RabbitMQ

1. 导入依赖
代码语言:javascript复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
2. yml配置
代码语言:javascript复制
spring:
  rabbitmq:
    host: 192.168.42.4
    port: 5672
    username: aruba
    password: aruba
    virtual-host: /
    listener:
      direct:
        acknowledge-mode: manual # 手动ack
      simple:
        prefetch: 1 # 流控
        concurrency: 10 # 多线程监控
3. 配置交换机和队列
代码语言:javascript复制
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "MY-MQ-EX";
    public static final String QUEUE_NAME = "MY-MQ-QUEUE";
    public static final String ROUTING_KEY = "key.#";

    /**
     * 注入交换机
     *
     * @return
     */
    @Bean
    public Exchange exchangeProvider() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 注入队列
     *
     * @return
     */
    @Bean
    public Queue queueProvider() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 注入交换机队列绑定关系
     *
     * @return
     */
    @Bean
    public Binding bootBinding(Exchange exchangeProvider, Queue queueProvider) {
        return BindingBuilder.bind(queueProvider).to(exchangeProvider).with(ROUTING_KEY).noargs();
    }
}
4. 发送消息

SpringBoot中使用RabbitTemplate自动注入,即可发送消息,并对方法都进行了封装

代码语言:javascript复制
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "key.send", "发送消息");
    }

    /**
     * 携带信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }
    
}
5. 订阅消息

在方法上使用@RabbitListener注解,即可指定订阅队列。

入参添加Channel,就可以和之前一样发送ack

将消息封装成了Message,可以获取其携带信息。

代码语言:javascript复制
@Component
public class MQListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列的消息为:"   msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为:"   correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

运行结果:

二、消息可靠性

由于RabbitMQ在发送消息和订阅消息时,都是通过网络传输,其间必然会出现由网络问题产生的消息丢失情况,要保证消息的可靠性从下面四点出发:

  • 保证消息发送到交换机
  • 保证消息路由到队列
  • 保证队列中消息的持久化
  • 保证消费者正常消费消息
1. Client-API方式
1.1 保证消息发送到交换机

Publisher Confirms就是为了保证消息发送到交换机的机制,一般使用异步的方式:

代码语言:javascript复制
        //4. 开启confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有送达交换机");
            }
        });
1.2 保证消息路由到队列

addReturnListener方法可以确认消息是否路由到了队列,如果回调了说明没有路由到队列

发送消息时,指定mandatory参数为true

代码语言:javascript复制
        //5. 设置return回调,确认消息是否路由到了队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交换机没有路由到队列");
            }
        });

        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
1.3 保证队列中消息的持久化

首先保证队列的持久化,再保证消息的持久化

代码语言:javascript复制
        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //6. 发送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
1.4 保证消费者正常消费消息

保证消费者正常消费消息只需要手动ack即可,生产者完整代码:

代码语言:javascript复制
public class Publisher {

    private static final String QUEUE_NAME = "confirm";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4. 开启confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有送达交换机");
            }
        });

        //5. 设置return回调,确认消息是否路由到了队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交换机没有路由到队列");
            }
        });

        //6. 发送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
    }

}
2. SpringBoot方式
2.1 配置Confirm

yml中开启confirm

代码语言:javascript复制
spring:
  rabbitmq:
    publisher-confirm-type: correlated

RabbitTemplate设置回调:

代码语言:javascript复制
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送达到交换机");
                } else {
                    System.out.println("消息没有送达到交换机");
                }
            }
        });
2.2 配置Return

yml中开启return

代码语言:javascript复制
spring:
  rabbitmq:
    publisher-returns: true

RabbitTemplate设置回调:

代码语言:javascript复制
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
            }
        });
2.3 消息持久化

设置Message的携带信息:

代码语言:javascript复制
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                });

完整代码:

代码语言:javascript复制
    /**
     * 携带信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送达到交换机");
                } else {
                    System.out.println("消息没有送达到交换机");
                }
            }
        });
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }

三、死信队列

死信队列是存放本来应该死亡的消息的队列,用于对这些消息的特殊处理(如:重新入队、持久化到数据库),具体有以下几种消息会被存放进死信队列:

  • 消费者拒绝的消息,并requeue设置为false(不重新入队列)
  • 消息的生存时间到了,还在队列中的信息
  • 队列设置了整体的消息生存时间,到了生存时间的消息
  • 到达队列中消息最大数,再路由过来的消息
1. 构建交换机

死信队列需要一个死信交换机,并把正常消息的队列绑定死信交换机:

代码语言:javascript复制
@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE_NAME = "normal-ex";
    public static final String NORMAL_QUEUE_NAME = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE_NAME = "dead-ex";
    public static final String DEAD_QUEUE_NAME = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";

    @Bean
    public Exchange normalExchange() {
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") //准备入死信队列的消息重新设置routin-key
                .build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}
2. 死信队列的实现方式
2.1 拒绝消息入死信队列

对正常队列消息进行监听,来做相应的处理,首先是拒绝消息,并且要把requeue设为false

代码语言:javascript复制
@Component
public class DeadListener {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE_NAME)
    public void normalListener(Message msg, Channel channel) throws IOException {
        System.out.println("接收到正常队列消息:"   new String(msg.getBody()));
        channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
//        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
    }

}

尝试发送一个消息:

代码语言:javascript复制
    @Test
    public void sendNormal() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME, "normal.msg", "哈喽");
    }

运行结果:

2.2 消息生存时间

发送消息时,通过消息的额外参数MessagePropertiessetExpiration方法设置过期时间:

代码语言:javascript复制
    @Test
    public void sendExpire() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        // 该消息10s后过期
                        message.getMessageProperties().setExpiration("10000");
                        return message;
                    }
                });
    }

记得把上面消息的监听注释掉,否则会消费消息

运行结果:

2.3 队列消息的整体生存时间

管理页面把之前的正常队列删除,在重新创建时,为正常队列设置ttl

设置ttl

代码语言:javascript复制
    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .ttl(5000) // 整体消息过期时间为5s
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送正常消息,运行结果:

2.4 达到队列最大数

同样先删除正常队列,后调用maxLength为队列设置最大消息数:

代码语言:javascript复制
    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整体消息过期时间为5s
                .maxLength(1) // 设置消息最大数
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送两次正常消息,运行结果:

四、延迟交换机

死信队列的问题:由于死信队列只会监听队列头的过期时间,一旦队列头的消息过期时间比后面排队的消息过期时间长,那么后面消息的过期时间并不会生效,而是等待队列头的过期时间到了后,才一并进入死信队列

删除正常队列,恢复配置:

代码语言:javascript复制
    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整体消息过期时间为5s
//                .maxLength(1) // 设置消息最大数
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送两次消息,第一次过期时间为30s,第二次为2s:

代码语言:javascript复制
    @Test
    public void sendExpire30() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("30000");
                        return message;
                    }
                });
    }

    @Test
    public void sendExpire2() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("2000");
                        return message;
                    }
                });
    }

结果,过了几秒后,队列中还是两个消息:

解决方法:根据时间创建多个队列或者使用延迟交换机

延迟交换机是一个插件,默认并不带,原理就是将消息暂时放在交换机中,由交换机根据消息过期时间的先后来路由到队列,缺点:由于消息在交换机中,重启会导致消息的丢失

1. 插件下载和使用

根据自己的RabbitMQ版本进行下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/

代码语言:javascript复制
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
mv rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/local/rabbitmq/rabbitmq_server-3.8.35/plugins

启动插件:

代码语言:javascript复制
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启服务或系统后,多了一个x-delayed-message的交换机类型:

2. 配置延迟交换机

使用CustomExchange构造x-delayed-message类型交换机,并使用其他参数x-delayed-type指定使用哪种原型交换机类型,这边使用的是topic

代码语言:javascript复制
@Configuration
public class DelayExchangeConfig {

    public static final String EXCHANGE_NAME = "delay-exchange";
    public static final String DELAY_QUEUE = "delay_queue";
    public static final String DELAY_ROUTIN_KEY = "delay.#";

    @Bean
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        // 使用哪种原型交换机类型
        args.put("x-delayed-type", "topic");
        Exchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
        return exchange;
    }

    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }


    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTIN_KEY).noargs();
    }
    
}
3. 发送消息

MessageProperties使用setDelay方法为消息设置延迟:

代码语言:javascript复制
    @Test
    public void sendDelay30() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(30000);
                        return message;
                    }
                });
    }

    @Test
    public void sendDelay5() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(5000);
                        return message;
                    }
                });
    }

消息在交换机进行等待后,首先入队列的为5秒延迟的,后面入队列的为30秒延迟的:

五、集群

1. 配置主机名

RabbitMQ集群的搭建要配置主机名:HOSTNAME,先修改network配置文件

代码语言:javascript复制
vi /etc/sysconfig/network

追加HOSTNAME:

代码语言:javascript复制
HOSTNAME=rabbit1

再修改hosts文件:

代码语言:javascript复制
vi /etc/hosts

追加内容:

代码语言:javascript复制
192.168.42.4 rabbit1

重启系统后,RabbitMQ先前配置的管理账号会丢失,需要重新配置

2. 克隆虚拟机
2.1 从机主机名配置

克隆后,对从机进行主机名的配置,network配置文件:

hosts文件,中需要添加集群主节点的ip和hostname:

2.2 建立集群关联

启动RabbitMQ服务后,管理界面的节点会带上主机名:

接下来,配置从机加入到主节点集群中,执行以下命令即可:

代码语言:javascript复制
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin/ 
./rabbitmqctl stop_app
./rabbitmqctl reset 
./rabbitmqctl join_cluster rabbit@rabbit1
./rabbitmqctl start_app

加入成功后,管理界面中就会出现多个节点:

3. 配置镜像模式

目前集群是普通模式,队列中的消息只会存在于一个节点上,而不会同步到其他队列,一旦该节点宕机,其他节点将无法访问消息。

镜像模式是指,集群中所有节点都有一份单独的拷贝,即使单一节点宕机,其他节点中依然存在消息的拷贝,这样才能实现高可用

在管理界面进行配置镜像策略:

新建一个队列,并查看详情:

项目地址:

https://gitee.com/aruba/rabbit-mqstudy.git

0 人点赞