RabbitMQ 如何实现延迟队列?答案可不止一种

2024-08-20 20:33:24 浏览数 (3)

回答重点

RabbitMQ 本身不支持延迟消息,但是可以通过它提供的两个特性 TTL(Time-To-Live and Expiration ,消息存活时间)、DLX(Dead Letter Exchanges,死信交换器) 来实现。还可以利用 RabbitMQ 插件来实现。

使用TTL 死信队列:

在 RabbitMQ 中,通过设置消息的 TTL 和死信交换器可以实现延迟队列。

不给原队列(正常队列)设置消费者,当消息在原队列中达到 TTL 后,由于还未被消费,则会被转发到绑定的死信交换器,消费者从死信队列中消费消息,从而实现消息的延迟处理。

使用 RabbitMQ 插件:延迟消息插件(rabbitmq-delayed-message-exchange):

通过安装 RabbitMQ 的延迟消息插件,可以直接创建延迟交换器(Delayed Exchange)。

在发送消息时,指定消息的延迟时间,RabbitMQ 会在消息达到延迟时间后将其转发到对应的队列进行消费。

扩展知识

TTL 和 DLX 简要说明

  • TTL(Time-To-Live):指消息在队列中的存活时间。你可以为队列中的所有消息统一设置TTL,也可以为每条消息单独设置TTL。当消息超过TTL时,消息会被标记为过期。
  • 死信队列(DLX):当消息在原队列中过期、被拒绝(nack/reject)或队列已满时,消息会被转发到绑定的死信交换器(DLX)。DLX 可以将消息重新路由到死信队列(即这里的延迟队列)。

TTL DLX 时序问题

因为队列的特点就是先进先出,如果发送的消息延迟的时间不同,例如第一个延迟 10s、第二个延迟 5s、第三个延迟 1s。

那么后面的消息,需要等 10s 的消息消费完才能消费。当 10s 消息未被消费,则后续的消息都会被阻塞,即使消息设置了更短的延迟。

这就是时序问题。如果一个队列中的消息延迟时间都一致,就可以避免这个问题,因此可以针对不同的延迟时间对应多创建几个队列。

或者可以利用延迟消息插件,插件不会有时序问题。

延迟消息插件原理

插件提供了一种新的交换器类型 x-delayed-message。这种交换器可以像普通的交换器一样,接收消息并根据路由键将消息路由到相应的队列。只不过 x-delayed-message 类型的交换机接收消息投递后,不会直接路由到队列中,而是存储到 Mnesia(Mnesia 是 Erlang 运行时中自带的一个数据库管理系统)中。

等到消息达到可投递时间,消息才会被投递到目标队列中。

更多关于插件的内容,可以查看 github:rabbitmq-delayed-message-exchange。

TTL DLX 实现

代码语言:javascript复制
public class DelayedQueueExample {

    private static final String EXCHANGE_NAME = "normal_exchange";
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    private static final String QUEUE_NAME = "normal_queue";
    private static final String DELAYED_QUEUE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "routing_key";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            // 声明死信交换器
            channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);

            // 声明延迟队列(用于接收过期消息)
            channel.queueDeclare(DELAYED_QUEUE_NAME, true, false, false, null);
            channel.queueBind(DELAYED_QUEUE_NAME, DEAD_LETTER_EXCHANGE, ROUTING_KEY);

            // 声明正常交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // 配置带有死信交换器的正常队列
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
            argsMap.put("x-dead-letter-routing-key", ROUTING_KEY);
            argsMap.put("x-message-ttl", 5000); // 消息的TTL为5秒

            channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            // 向正常交换器发送消息
            String message = "This is a delayed message!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("Sent message: "   message);
        }
    }
}

延迟插件的实现

安装插件

首先,确保安装并启用了 rabbitmq_delayed_message_exchange 插件:

代码语言:javascript复制
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用插件配置延迟队列

在安装插件后,可以通过设置 x-delayed-type 参数来创建支持延迟消息的交换机:

代码语言:javascript复制
public class RabbitMQDelayedExchange {
    private static final String DELAYED_EXCHANGE = "delayed_exchange";
    private static final String DELAYED_QUEUE = "delayed_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明延迟交换机
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());

            channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", true, false, argsMap);
            channel.queueDeclare(DELAYED_QUEUE, true, false, false, null);
            channel.queueBind(DELAYED_QUEUE, DELAYED_EXCHANGE, "");

            System.out.println("Delayed exchange and queue declared.");

            // 发送带有延迟时间的消息
            String message = "This is a delayed message";
            Map<String, Object> headers = new HashMap<>();
            headers.put("x-delay", 10000);  // 延迟 10 秒

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .headers(headers)
                    .build();

            channel.basicPublish(DELAYED_EXCHANGE, "", properties, message.getBytes("UTF-8"));
            System.out.println("Message sent with delay: "   message);
        }
    }
}

0 人点赞