RabbitMQ高级特性之消费端限流

2021-03-08 12:45:33 浏览数 (1)

前言

假设我们现在有这么一个场景,我们的消费端由于某些原因导致全部宕机等不可用,导致RabbitMQ服务器队列中存储了大量消息未被消费掉,那么等恢复消费端服务器后,就会有巨大量的消息全部推送过来,但是我们单个客户端无法同事处理这么多消息,就是导致消费端一些不可预测错误,甚至又会重复发生宕机,所以在实际业务场景中,限流保护还是非常重要的。

消费端限流

什么是消费端限流

rabbitMQ 提供了一种 qos (服务质量保证)功能,规定消费端每次只能接收多少条消息,消费端在没有将接收到的消息全部确认之前,是不会在进行接收消息的。

代码演示

本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建消费端限流Demo

创建一个简单的maven项目

导入依赖

首先在我的父工程 pom.xml 导入maven依赖

代码语言:javascript复制
<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.0.0.RELEASE</version></parent>
<dependencies>        <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>1.18.8</version>    </dependency></dependencies>

生产者

生产者项目结构

pom文件

代码语言:javascript复制
<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId></dependency>

yml文件

代码语言:javascript复制
server:  port: 8081
spring:  rabbitmq:    ####连接地址    host: 192.168.137.5    ####端口号    port: 5672    ####账号    username: sunny    ####密码    password: sunny    ### 交换机    virtual-host: /sunny_vm    # publisher-confirms和publisher-returns是对于消息生产端的配置    publisher-confirms: true # 开启发送消息确认 对应RabbitTemplate.ConfirmCallback接口    publisher-returns: true  # 开启发送消息失败返回 对应RabbitTemplate.ReturnCallback接口

生产者配置类

代码语言:javascript复制
@Configurationpublic class RabbbitMqConfig {

    public static final String EXCHANGE_NAME = "sunny_prefetch_exchange";

    /**     * prefetch队列名称     */    public static final String PREFETCH_QUEUE_NAME = "routing_prefetch_queue";

    /**     * prefetch 路由key名称     */    public static final String PREFETCH_ROUTING_KEY = "routing.prefetch.key";

    /**     * 声明prefetch队列     *     * @return     */    @Bean    public Queue prefetchQueue() {        return new Queue(PREFETCH_QUEUE_NAME, true);    }
    /**     * 声明一个Direct类型的交换机     *     * @return     */    @Bean    public DirectExchange directExchange() {        return new DirectExchange(EXCHANGE_NAME);    }

    /**     * 将上面的prefetch队列绑定到Direct交换机     *     * @param prefetchQueue     * @param directExchange     * @return     */    @Bean    public Binding configQueueDirectExchange(Queue prefetchQueue, DirectExchange directExchange) {        return BindingBuilder.bind(prefetchQueue).to(directExchange).with(PREFETCH_ROUTING_KEY);    }}

RabbitMQACK配置文件

代码语言:javascript复制
/** * @Description 消息发送确认 * <p> * ConfirmCallback  只确认消息是否正确到达 Exchange 中 * ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行 * <p> * 1. 如果消息没有到exchange,则confirm回调,ack=false * 2. 如果消息到达exchange,则confirm回调,ack=true * 3. exchange到queue成功,则不回调return * 4. exchange到queue失败,则回调return * */@Slf4j@Configurationpublic class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired    private RabbitTemplate rabbitTemplate;
    @PostConstruct    public void init() {        rabbitTemplate.setConfirmCallback(this);            // 指定 ConfirmCallback        rabbitTemplate.setReturnCallback(this);             // 指定 ReturnCallback    }

    /**     * 如果消息到达 exchange, 则 confirm 回调, ack = true     * 如果消息不到达 exchange, 则 confirm 回调, ack = false     * 需要设置spring.rabbitmq.publisher-confirms=true     *     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        log.info("消息是否到达Exchange:{}", ack == true ? "消息成功到达Exchange" : "消息到达Exchange失败");        if (!ack) {            log.info("消息到达Exchange失败原因:{}", cause);
            // 根据业务逻辑实现消息补偿机制
        }    }
    /**     * exchange 到达 queue, 则 returnedMessage 不回调     * exchange 到达 queue 失败, 则 returnedMessage 回调     * 需要设置spring.rabbitmq.publisher-returns=true     *     * @param message     * @param replyCode     * @param replyText     * @param exchange     * @param routingKey     */    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        log.info("消息报文:{}", new String(message.getBody()));        log.info("消息编号:{}", replyCode);        log.info("描述:{}", replyText);        log.info("交换机名称:{}", exchange);        log.info("路由名称:{}", routingKey);
        // 根据业务逻辑实现消息补偿机制
    }}

生产者发送消息

代码语言:javascript复制
@Slf4j@RestControllerpublic class ProducerController {

    @Autowired    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send")    public void send() {        for (int i = 0; i < 9; i  ) {            String msg = "发送第"   i   "消息";            rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.PREFETCH_ROUTING_KEY, msg);            log.info("发送下标消息{}", i);        }    }}

生产者测试发送消息

打开浏览器,访问指定网址

代码语言:javascript复制
http://localhost:8081/send

登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称

SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为direct类型。

我们还可以点击交换机的名称,然后看到交换机绑定队列的关系图等。

然后可以看到,我绑定交换机的队列,积存着9条消息

消费者

消费者项目结构

pom文件

代码语言:javascript复制
<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

yml文件

代码语言:javascript复制
server:  port: 8080
spring:  rabbitmq:    ####连接地址    host: 192.168.137.5    ####端口号    port: 5672    ####账号    username: sunny    ####密码    password: sunny    ### 交换机    virtual-host: /sunny_vm    #这个配置是针对消息消费端的配置    listener:      simple:        acknowledge-mode: manual # 开启 ack 手动确认        prefetch: 3 #消费端限流/一次就接收三条,需等待3条消费完毕,才继续接收消息

新建消费者,监听的队列

代码语言:javascript复制
@Slf4j@Componentpublic class ConsumerController {
    @RabbitHandler    @RabbitListener(queues = {"routing_prefetch_queue"})    public void consumer(Channel channel, Message message) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("时间:{},成功接收消息:{}", System.currentTimeMillis(), deliveryTag);
        if (deliveryTag % 3 == 0) {
            //第二个参数--- true:批量接收数据,false:逐条接收数据            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            //模拟业务限流,接收消息等于3条消息,则阻塞3秒种,再次进行接收消息            Thread.sleep(3000);        }    }}

启动消费者项目,项目启动后会自动消费消息

队列中积压的消息被成功消费

到此SpringBoot整合RabbitMQ实现消费端限流Demo就结束拉

总结

1.为了防止消费端某时刻同时接收大量的消息导致不可预测情况发生,我们可以在消费端添加限流处理,每次限制接收多少条消息。

0 人点赞