RabbitMQ高级特性之延迟队列

2021-03-25 14:39:18 浏览数 (1)

前言

很多时候我们想定时去做某件事情的时候我们会首先想到定时任务,quartz是个不错的选择,但是也有缺点,假如配置在项目中,集群部署会有重复执行的问题,如果持久化在mysql中,解决了集群的问题,但是过于依赖mysql,耦合严重,当然还有日志量庞大、执行时间精度、过于耗费系统资源等等问题。所以这时候使用消息队列中间件的的延时队列就是一个很好得解决方案,我们设置要触发消费的时间和必要的参数入队mq,到时监听queue的消费者自然拿到消息然后去走业务流程,这里介绍的是基于rabbitmq中间件实现的TTL版的延时队列。

延时队列

什么是延时队列

延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延时队列的使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

TTL高级特性

什么是TTL

TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”

TTL简单理解就是给单个消息或给某个队列设置消息过期时间,如果未在消息过期之前成功消费消息,那么消息都会自动销毁掉(没有设置死信队列的前提)。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

代码语言:javascript复制
Mapmap = new HashMap<>(3);
// 队列中的消息未被消费则10秒后过期
map.put("x-message-ttl", 10000);
//然后进行绑定队列

上面这种方式针对整个队列的,意思是只要有消息发送到这个队列,那么该队列的消息都会存在消息过期时间

另外一种方式是针对每条消息设置TTL的,代码如下:

代码语言:javascript复制
//第一种方式处理消息TTL
//消息处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        //设置message的信息
        message.getMessageProperties().setExpiration("10000");//消息的过期时间
        return message;
    }
};
rabbitTemplate.convertAndSend("交换机名称", "路由key", "消息哦", messagePostProcessor);
//第二种方式是使用函数式编程设置消息TTL
rabbitTemplate.convertAndSend("交换机名称", "路由key", "消息哦", message -> {
    //设置message的信息
    message.getMessageProperties().setExpiration("10000");//消息的过期时间
    return message;
});

这样这条消息的过期时间也被设置成了10秒。

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

死信队列

什么是死信队列

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

关于如何配置死信队列,我下面demo会有介绍到

代码演示

本文是基于SpringBoot框架去集成的RabbitMQ,所以最好会SpringBoot基础,再跟着本文一起搭建主题队列Demo

创建一个简单的maven项目

导入依赖

首先在我的父工程 pom.xml 导入maven依赖

代码语言:javascript复制
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
</parent>
<dependencies>
 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.8</version>
    </dependency>
</dependencies>

生产者

生产者项目结构

pom文件

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

yml文件

代码语言:javascript复制
server:
  port: 8081
spring:
  rabbitmq:
    ####连接地址
    host: 192.168.137.5
    ####端口号
    port: 5672
    ####账号
    username: sunny
    ####密码
    password: sunny
    ### 交换机
    virtual-host: /sunny_vm
    # publisher-confirms和publisher-returns是对于消息生产端的配置
    publisher-confirms: true # 开启发送消息确认 对应RabbitTemplate.ConfirmCallback接口
    publisher-returns: true  # 开启发送消息失败返回 对应RabbitTemplate.ReturnCallback接口

生产者配置类

需要注意的是,我这里针对整个队列设置消息过期时间,然后队列消息过期后,会自动将消息转发到死信队列里面

代码语言:javascript复制
@Configuration
public class RabbbitMqConfig {
 
    //==========================================普通队列配置开始===========================================
    /**
     * 定义普通交换机名称
     */
    public static final String EXCHANGE_NAME = "sunny_delay_exchange";
    /**
     * 定义普通队列名称
     */
    public static final String QUEUE_NAME = "sunny_delay_queue";
    /**
     * 定义普通RoutingKey名称
     */
    public static final String ROUTING_KEY = "sunny.routing.key";
    /**
     * 声明一个Direct类型的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
    /**
     * 声明TTL队列(消息超时未消费自动过期队列)
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        Map<String, Object> map = new HashMap<>(3);
        // 队列中的消息未被消费则10秒后过期
        map.put("x-message-ttl", 10000);
        // 绑定死信交换器
        map.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        // 绑定死信路由键
        map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        /**
         * 第一个参数:队列名称
         * 第二个参数:是否持久化
         * 第三个参数: null
         * 第四个参数:是否自动删除队列(true:队列中没消息会自动删除队列)
         * 第五个参数:声明队列参数
         */
        return new Queue(QUEUE_NAME, true, false, false, map);
    }
    /**
     * 将上面的TLL队列绑定到普通交换机
     *
     * @param directQueue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding queueDirectExchange(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(ROUTING_KEY);
    }
    //==========================================普通队列配置结束===========================================
    //==========================================死信队列配置开始===========================================
    /**
     * 定义死信交换机名称
     */
    public static final String DLX_EXCHANGE_NAME = "sunny_dlx_delay_exchange";
    /**
     * 定义死信队列名称
     */
    public static final String DLX_QUEUE_NAME = "sunny_dlx_delay_queue";
    /**
     * 定义死信RoutingKey名称
     */
    public static final String DLX_ROUTING_KEY = "sunny.dlx.routing.key";
    /**
     * 定义一个死信交换器和普通的交换器 没有差别
     */
    @Bean
    public DirectExchange dlxDirectExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME);
    }
    /**
     * 定义一个监听死信交换器的死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE_NAME);
    }
    /**
     * 将死信队列绑定到死信路由器
     */
    @Bean
    public Binding bingDlxQueue(Queue dlxQueue, DirectExchange dlxDirectExchange) {
        return BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with(DLX_ROUTING_KEY);
    }
    //==========================================死信队列配置结束===========================================
}

RabbitMQACK配置文件

代码语言:javascript复制
/**
 * @Description 消息发送确认
 * <p>
 * ConfirmCallback  只确认消息是否正确到达 Exchange 中
 * ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行
 * <p>
 * 1. 如果消息没有到exchange,则confirm回调,ack=false
 * 2. 如果消息到达exchange,则confirm回调,ack=true
 * 3. exchange到queue成功,则不回调return
 * 4. exchange到queue失败,则回调return
 *
 */
@Slf4j
@Configuration
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            // 指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             // 指定 ReturnCallback
    }
    /**
     * 如果消息到达 exchange, 则 confirm 回调, ack = true
     * 如果消息不到达 exchange, 则 confirm 回调, ack = false
     * 需要设置spring.rabbitmq.publisher-confirms=true
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息是否到达Exchange:{}", ack == true ? "消息成功到达Exchange" : "消息到达Exchange失败");
        if (!ack) {
            log.info("消息到达Exchange失败原因:{}", cause);
            // 根据业务逻辑实现消息补偿机制
        }
    }
    /**
     * exchange 到达 queue, 则 returnedMessage 不回调
     * exchange 到达 queue 失败, 则 returnedMessage 回调
     * 需要设置spring.rabbitmq.publisher-returns=true
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息报文:{}", new String(message.getBody()));
        log.info("消息编号:{}", replyCode);
        log.info("描述:{}", replyText);
        log.info("交换机名称:{}", exchange);
        log.info("路由名称:{}", routingKey);
        // 根据业务逻辑实现消息补偿机制
    }
}

生产者发送消息

以下是三种方式发送消息,第一种和第二种方式是以消息设置过期时间,第三种则是以队列设置消息过期时间

如果对整个队列设置了过期时间,又同时对单条消息设置过期时间,那么会是以哪个消息过期时间短,就执行哪个咯

我这里就使用以队列设置消息过期时间了

代码语言:javascript复制
@Slf4j
@RestController
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send")
    public String send() {
//        //第一种方式处理消息TTL
//        //消息处理对象,设置一些消息的参数信息
//        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
//
//            @Override
//            public Message postProcessMessage(Message message) throws AmqpException {
//                //设置message的信息
//                message.getMessageProperties().setExpiration("5000");//消息的过期时间
//
//                return message;
//            }
//        };
//        rabbitTemplate.convertAndSend("交换机名称", "路由key", "消息哦", messagePostProcessor);
//
//
//        //第二种方式是使用函数式编程设置消息TTL
//        rabbitTemplate.convertAndSend("交换机名称", "路由key", "消息哦", message -> {
//            //设置message的信息
//            message.getMessageProperties().setExpiration("5000");//消息的过期时间
//            return message;
//        });
        rabbitTemplate.convertAndSend(RabbbitMqConfig.EXCHANGE_NAME, RabbbitMqConfig.ROUTING_KEY, "延迟队列消息");
        return "发生成功";
    }
}

生产者测试发送消息

打开浏览器,访问指定网址

代码语言:javascript复制
http://localhost:8081/send

登陆Mangerment界面,可以看到我们在配置文件中配置的交换机名称

SpringBoot自动在RabbitMQ里面,已经帮我们创建好了,且交换机的类型为direct类型。

我们还可以点击其中某一个交换机的名称,然后看到交换机绑定队列的关系图等。

然后可以看到,我绑定交换机的队列,积存着1条消息

但是当过了10秒钟之后,我们的普通队列的消息堆积数量为0了,死信队列多了一条消息,因为普通队列设置了10秒过期时间,所以消息一旦过期,会自动将消息转发到死信队列拉

消费者

消费者项目结构

pom文件

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

yml文件

代码语言:javascript复制
server:
  port: 8080

spring:
  rabbitmq:
    ####连接地址
    host: 192.168.137.5
    ####端口号
    port: 5672
    ####账号
    username: sunny
    ####密码
    password: sunny
    ### 交换机
    virtual-host: /sunny_vm
    #这个配置是针对消息消费端的配置
    listener:
      simple:
        acknowledge-mode: manual # 开启 ack 手动确认

新建普通消费者

代码语言:javascript复制
@Slf4j
@Component
public class ConsumerController {

    /**
     * 普通队列
     */
    @RabbitHandler
    @RabbitListener(queues = "sunny_delay_queue")
    public void consumer(Channel channel, Message message, String news) throws IOException {
        log.info("=================进入普通队列接收消息=================");
        log.info("消息接收时间:{},接收内容:{}", LocalDateTime.now(), news);

        try {
            //第二个参数--- true:批量接收数据,false:逐条接收数据
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());

            /**
             * basicRecover方法是进行补发操作,
             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
             * 设置为false是只补发给当前的consumer
             */
            channel.basicRecover(false);
        }

        log.info("消息消费成功");
    }
}

新建死信队列消费者

代码语言:javascript复制
@Component
@Slf4j
public class DlxController {

    /**
     * 死信队列
     */
    @RabbitHandler
    @RabbitListener(queues = "sunny_dlx_delay_queue")
    public void delayConsumer(Channel channel, Message message, String news) throws IOException {
        log.info("=================进入死信队列接收消息=================");
        log.info("消息接收时间:{},接收内容:{}", LocalDateTime.now(), news);

        try {
            //第二个参数--- true:批量接收数据,false:逐条接收数据
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());

            /**
             * basicRecover方法是进行补发操作,
             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
             * 设置为false是只补发给当前的consumer
             */
            channel.basicRecover(false);
        }

        log.info("消息消费成功");
    }
}

启动消费者项目,项目启动后会自动消费死信队列消息

死信队列中积压的消息被成功消费

到此SpringBoot整合RabbitMQ实现延时队列代码Demo就结束拉

总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。

0 人点赞