回答重点
RabbitMQ 本身不支持延迟消息,但是可以通过它提供的两个特性 TTL(Time-To-Live and Expiration ,消息存活时间)、DLX(Dead Letter Exchanges,死信交换器) 来实现。还可以利用 RabbitMQ 插件来实现。
使用TTL 死信队列:
在 RabbitMQ 中,通过设置消息的 TTL 和死信交换器可以实现延迟队列。
不给原队列(正常队列)设置消费者,当消息在原队列中达到 TTL 后,由于还未被消费,则会被转发到绑定的死信交换器,消费者从死信队列中消费消息,从而实现消息的延迟处理。
使用 RabbitMQ 插件:延迟消息插件(rabbitmq-delayed-message-exchange):
通过安装 RabbitMQ 的延迟消息插件,可以直接创建延迟交换器(Delayed Exchange)。
在发送消息时,指定消息的延迟时间,RabbitMQ 会在消息达到延迟时间后将其转发到对应的队列进行消费。
扩展知识
TTL 和 DLX 简要说明
- TTL(Time-To-Live):指消息在队列中的存活时间。你可以为队列中的所有消息统一设置TTL,也可以为每条消息单独设置TTL。当消息超过TTL时,消息会被标记为过期。
- 死信队列(DLX):当消息在原队列中过期、被拒绝(nack/reject)或队列已满时,消息会被转发到绑定的死信交换器(DLX)。DLX 可以将消息重新路由到死信队列(即这里的延迟队列)。
TTL DLX 时序问题
因为队列的特点就是先进先出,如果发送的消息延迟的时间不同,例如第一个延迟 10s、第二个延迟 5s、第三个延迟 1s。
那么后面的消息,需要等 10s 的消息消费完才能消费。当 10s 消息未被消费,则后续的消息都会被阻塞,即使消息设置了更短的延迟。
这就是时序问题。如果一个队列中的消息延迟时间都一致,就可以避免这个问题,因此可以针对不同的延迟时间对应多创建几个队列。
或者可以利用延迟消息插件,插件不会有时序问题。
延迟消息插件原理
插件提供了一种新的交换器类型 x-delayed-message
。这种交换器可以像普通的交换器一样,接收消息并根据路由键将消息路由到相应的队列。只不过 x-delayed-message
类型的交换机接收消息投递后,不会直接路由到队列中,而是存储到 Mnesia(Mnesia 是 Erlang 运行时中自带的一个数据库管理系统)中。
等到消息达到可投递时间,消息才会被投递到目标队列中。
更多关于插件的内容,可以查看 github:rabbitmq-delayed-message-exchange。
TTL DLX 实现
代码语言:javascript复制public class DelayedQueueExample {
private static final String EXCHANGE_NAME = "normal_exchange";
private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
private static final String QUEUE_NAME = "normal_queue";
private static final String DELAYED_QUEUE_NAME = "delayed_queue";
private static final String ROUTING_KEY = "routing_key";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
// 声明死信交换器
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明延迟队列(用于接收过期消息)
channel.queueDeclare(DELAYED_QUEUE_NAME, true, false, false, null);
channel.queueBind(DELAYED_QUEUE_NAME, DEAD_LETTER_EXCHANGE, ROUTING_KEY);
// 声明正常交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 配置带有死信交换器的正常队列
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
argsMap.put("x-dead-letter-routing-key", ROUTING_KEY);
argsMap.put("x-message-ttl", 5000); // 消息的TTL为5秒
channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 向正常交换器发送消息
String message = "This is a delayed message!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("Sent message: " message);
}
}
}
延迟插件的实现
安装插件
首先,确保安装并启用了 rabbitmq_delayed_message_exchange
插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用插件配置延迟队列
在安装插件后,可以通过设置 x-delayed-type
参数来创建支持延迟消息的交换机:
public class RabbitMQDelayedExchange {
private static final String DELAYED_EXCHANGE = "delayed_exchange";
private static final String DELAYED_QUEUE = "delayed_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明延迟交换机
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", true, false, argsMap);
channel.queueDeclare(DELAYED_QUEUE, true, false, false, null);
channel.queueBind(DELAYED_QUEUE, DELAYED_EXCHANGE, "");
System.out.println("Delayed exchange and queue declared.");
// 发送带有延迟时间的消息
String message = "This is a delayed message";
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 10000); // 延迟 10 秒
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish(DELAYED_EXCHANGE, "", properties, message.getBytes("UTF-8"));
System.out.println("Message sent with delay: " message);
}
}
}