SpringBoot+RabbitMQ 实现延迟队列

2020-04-22 11:46:12 浏览数 (1)

9 total views, 3 views today

RabbitMQ 实现延迟队列,方法有两种。

第一种是安装延迟队列插件;

第二种就是利用死信队列的方式;

这里采用第二种方式。

rabbitmq 自身的一些概念,可以去网上或者书上获得。rabbitmq 延迟队列的实现原理,网上资料很多,简单盗图一张。

简单说明一下原理。

将消息发送到一个队列中去,消息自身有一个 TTL,即失效时间,如果到期还是为消费该消息,那么该消息就成为死信,将死信移到专门的死性队列,然后消费者只需要消费死信队列中的消息,变相的实现了延迟消息的功能。

基于 Springboot 的具体代码实现:

代码语言:javascript复制
<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件:

代码语言:javascript复制
spring:
  rabbitmq:
    host: 111.22.1.2 
    port: 5672
    username: user
    password: passw
    template:
      mandatory: true
    #支持发布确认与返回
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        #是否自动开始监听消息队列
        auto-startup: false
        #手动应答
        acknowledge-mode: manual
        #监听容器数及最大数
        concurrency: 1
        max-concurrency: 1
        #是否支持重试
        retry:
          enabled: true

配置文件 TopicRabbitConfig.Java

代码语言:javascript复制
package com.mine.config;


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TopicRabbitConfig {


    //延迟队列的名字
    public static final String delayQueueName = "delay_machine_check_queue";
    //延迟队列的exchange名字
    public static final String delayExchangeName = "delay_machine_check_exchange";

    //死信exchange的名称
    public static final String deadLetterProcessQueueName = "dead_letter_process_queue";
    //死信exchange的名称
    public static final String deadLetterProcessExchangeName = "dead_letter_machine_check_exchange";

    public static final String routingKey = "machine_check";


    //延迟队列的exchange
    @Bean
    public TopicExchange delayExchange() {
        return new TopicExchange(delayExchangeName, true, false);
    }

    //死信队列的exchange
    @Bean
    public TopicExchange deadLetterProcessExchange() {
        return new TopicExchange(deadLetterProcessExchangeName, true, false);
    }

    //延迟队列
    @Bean
    Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        //args.put("x-message-ttl", 20000);
        args.put("x-dead-letter-exchange", deadLetterProcessExchangeName); //DLX,dead letter发送到的exchange
        args.put("x-dead-letter-routing-key", routingKey);
        return new Queue(delayQueueName, true, false, false, args);
    }

    //死信队列
    @Bean
    public Queue deadLetterProcessQueue() {
        return new Queue(deadLetterProcessQueueName, true, false, false);
    }

    //绑定延迟队列,延迟Exchange,routing关系
    @Bean
    Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(routingKey);
    }

    //绑定死信队列,死信Exchange,routing关系
    //参数根据Spring命名约定的方式,会将上面的Queue实例和exchange实例注入进来,形成绑定关系
    @Bean
    Binding bindingDeadLetterProcessExchange(Queue deadLetterProcessQueue, TopicExchange deadLetterProcessExchange) {
        return BindingBuilder.bind(deadLetterProcessQueue).to(deadLetterProcessExchange).with(routingKey);
    }
}

监听代码:

代码语言:javascript复制
//只监听听死信队列即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = TopicRabbitConfig.deadLetterProcessQueueName, durable = "true", autoDelete = "false"),
        exchange = @Exchange(name = TopicRabbitConfig.deadLetterProcessExchangeName, durable = "true", type = "topic", autoDelete = "false"),
        key = TopicRabbitConfig.routingKey), autoStartup = "true", id = "myconsumer")
public void receiveRabbitMQ(Message message, Channel channel) throws Exception {
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        LogHelper.log4j2.info("收到消息:"   new String(message.getBody()));

    } catch (Exception ex) {
        LogHelper.log4j2.error("receive", ex);
    }
}

上面 2 段代码,实现的效果就是如下:

发送消息的代码:RabbitMQTopicSender.java

代码语言:javascript复制
package com.mine.utils;

import com.mine.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Component
public class RabbitMQTopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String message, Long delaySecond) {

        /***
         * 方法参数说明
         * https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html
         convertAndSend(String exchange, String routingKey, Object object)
         Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
         ***/

        message = message   "【"   LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))   "】";
        LogHelper.log4j2.info("message:"   message);
        MessageProperties props = new MessageProperties();
        props.setExpiration(Long.toString(delaySecond * 1000));//消息的延迟时间
        Message ttlMessage = new Message(message.getBytes(), props);

        rabbitTemplate.convertAndSend(TopicRabbitConfig.delayExchangeName, TopicRabbitConfig.routingKey, ttlMessage);
        LogHelper.log4j2.info("消息发送成功");
    }
}

至此调用 send 方法,即可发送延迟队列。

以上代码亲测有效。

原创文章,转载请注明出处!https://cloud.tencent.com/developer/article/1618510

0 人点赞