Rabbitmq 通过死信队列实现延迟消息发送

2022-05-05 14:55:24 浏览数 (1)

Rabbitmq 通过死信队列实现延迟消息发送

文章目录

  • 设置消息的过期时间(TTL)
    • 两种方法设置 TTL
    • Java 代码实现
      • 给队列设置 TTL
      • 给每一个消息单独设置 TTL
  • 死信队列
    • 延迟队列
    • 实现方法
    • Java 代码
  • 缺点

设置消息的过期时间(TTL)

TTL, Time to Live 的简称, 即过期时间.

两种方法设置 TTL

  1. 通过队列属性设置. 即队列中所有的消息都有相同的过期时间. 在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现, 单位是毫秒
  2. 对消息本身进行单独设置. 即每条消息的 TTL 可以不同. 在 channel.basicPublish 方法中加入 expiration 属性, 单位是毫秒 总结: 如果两种方法一起使用, 则以较小的那个 TTL 为准. 消息过期后, 消费者无法再接收该消息, 就会变成死信(Dead Message). 这个特性可以实现延迟队列功能

Java 代码实现

给队列设置 TTL

代码语言:javascript复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-message-ttl", 2000);// 2s 过期
        Queue queue = new Queue("queue.expiration", true, false, false, argMap);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(
            BindingBuilder.bind(queue).to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
        return rabbitAdmin;
    }
}

@Service
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String msg) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

给每一个消息单独设置 TTL

代码语言:javascript复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;  
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
        rabbitAdmin.declareQueue(new Queue("queue.expiration"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.expiration"))
            .to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
        return rabbitAdmin;
    }
}

@Service
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            message.getMessageProperties().setExpiration(expirationTime);
            return message;
        });
    }
}

死信队列

DLX(Dead-Letter-Exchange), 可以称之为死信交换器, 也成死信信箱. 当消息在一个队列中变成死信(dead message) 后, 会被重新发送到另外一个交换器中, 这个交换器就是 DLX. 绑定了 DLX 的队列就是死信队列. 说白了就是, 有两个队列, 一个队列上的消息设置了过期时间, 但没有消费者. 另一个队列是普通队列, 有消费者. 后者被称为死信队列. 当前一个队列消息过期后, Rabbitmq 会自动将过期消息转发到死信队列里. 然后被死信队列的消费者消费掉. 实现消息的延迟发送功能

延迟队列

延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行

实现方法

通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数为这个队列添加 DLX

Java 代码

代码语言:javascript复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        // 死信队列
        rabbitAdmin.declareExchange(new DirectExchange("exchange.dlx"));
        rabbitAdmin.declareQueue(new Queue("queue.dlx"));
        rabbitAdmin.declareBinding(
            BindingBuilder.bind(new Queue("queue.dlx")).to(new DirectExchange("exchange.dlx")).with("routingkey.dlx"));

        // 延迟队列
        Map<String, Object> argMap = new HashMap<>(3);
        argMap.put("x-message-ttl", 2000);// 2s 过期
        argMap.put("x-dead-letter-exchange", "exchange.dlx");
        argMap.put("x-dead-letter-routing-key", "routingkey.dlx");
        rabbitAdmin.declareQueue(new Queue("queue.normal", true, false, false, argMap));
        rabbitAdmin.declareExchange(new TopicExchange("exchange.normal"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.normal", true, false, false, argMap))
            .to(new TopicExchange("exchange.normal")).with("queue.normal"));
    }
}

缺点

使用死信队列来实现消息的延迟发送. 如果是采用第一种方式, 即每个队列设置相同的过期时间, 可以很好的实现消息的延迟发送功能. 如果采用第二种方式, 给每个消息设置不同的过期时间, 由于队列先入先出的特性, 如果队列头的消息过期时间很长, 后面的消息过期时间很短, 会导致后面的消息过期后不能及时被消费掉

简单的做法时, 使用 rabbitmq 的延迟插件: Rabbitmq 通过延迟插件实现延迟队列

0 人点赞