点击上方蓝字关注我们 文末有惊喜
学过rabbitmq的同学应该都知道,rabbitmq是没有延时队列功能的,为什么面试官还会问这个奇葩的问题呢?
我在这里可以肯定地说:rabbitmq是没有实现延时队列的功能,但是我们可以曲线救国,使用死信队列 TTL同样可以实现延时队列的功能。
还有一种实现方式是通过延迟队列插件实现,我后面也会介绍。
延时队列使用场景
用的最多的地方就是订单支付超时取消订单
在说如何实现之前,我们先来介绍下什么是死信队列和TTL:
关键点讲解
死信队列
在rabbitmq中,死信队列其实应该称为死信交换机,那么这个死信到底是什么意思呢?死信队列和普通队列没有区别的,但是用户不会主动把消息发送到这个队列或交换机中的,只有当以下几种情况发生了,消息才会从原来的队列中转发到死信队列:
- 原队列的消息长度超过预定的限制
- 消费者拒收消息,basicNack/basicReject,并且没有把消息重新放回队列中
- 原队列消息设置了过期时间,如果在过期之前,还没有被消费者消费,那么也会被转到死信队列中;
死信队列相关的设置参数是绑定在队列中设置的:「x-dead-letter-exchange」,「x-dead-letter-routing-key」
TTL
TTL全称是Time To Live,翻译为过期时间,当消息已经存活到ttl设置的时间后还没有被消费,则会清除该消息,rabbit可以对队列设置过期消息,也可以对具体的消息设置过期消息,这里提一个小小的面试题:
问:rabbit是如何处理设置了过期时间的消息的?
答:rabbit实现的是一个懒策略去清理过期时间,目的是为了保证消息队列的高吞吐量;这个懒策略是通过在消息到达了队列的顶部之后,broker会检查队列是否设置了过期时间,如果设置了则检查过期时间是否已经到了,如果到了则剔除消息,不推送此消息,切记不要回答,broker会遍历每个消息,检查过期时间,切记切记!!!
❝前面已经介绍了两个重要的技术点,现在该进入本文的主题了,rabbitmq到底是如何实现延时队列的呢? ❞
使用TTL DLX
实现思路
想必大家在经过我上面对TTL和死信队列的讲解后,大家有可能心里已经知道该如何实现了,不过就算你知道如何实现了,我还是要讲的,哈哈
因为TTL是可以对消息设置过期时间,而进入死信队列的条件中有这么一条:原队列消息设置了过期时间,如果在过期之前,还没有被消费者消费,那么也会被转到死信队列中,那么我们可以结合这两者这么去做,处理正常业务的监听器去监听这个死信队列,然后给正常队列设置下这个死信队列的参数,那么消息流转会变成这样:
- 我发送了一个设置过期时间为10000毫秒的消息到broker中
- broker把消息放到了队列中
- 过了10000毫秒后,消息还没有被消费掉
- broker就会把消息转发到死信交换机中,再由死信交换机把消息推送到死信队列中
- 我刚开始已经设置了一个监听器去监听了死信队列,那么我收到这个消息的时候肯定是在10000毫秒以后了;
代码编写
- 生产者队列与交换机绑定和队列声明
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_DLX = "queue_test_dlx";
public static final String QUEUE_TEST_NORMAL = "queue_test_normal";
public static final String EXCHANGE_TEST_DLX = "exchange_test_dlx";
// 声明一个默认不进行消费的队列 绑定死信队列交换机和死信队列的key
@Bean("queueTestNormal")
public Queue queueTestNormal() {
return QueueBuilder.durable(QUEUE_TEST_NORMAL).deadLetterExchange(EXCHANGE_TEST_DLX).deadLetterRoutingKey("testdlx").build();
}
// 声明死信队列
@Bean("queueTestDLX")
public Queue queueTestDLX() {
return QueueBuilder.durable(QUEUE_TEST_DLX).build();
}
// 声明死信交换机
@Bean("exchangeTestDLX")
public Exchange exchangeTestDLX() {
return ExchangeBuilder.directExchange(EXCHANGE_TEST_DLX).durable(true).build();
}
// 死信队列与死信交换机绑定
@Bean
public Binding itemQueueExchange7(@Qualifier("queueTestDLX") Queue queue,
@Qualifier("exchangeTestDLX") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testdlx").noargs();
}
}
- 生产者使用简单发送消息给普通队列,并设置过期时间为10s
@Test
public void testDLX() {
rabbitTemplate.convertAndSend(null, "queue_test_normal","我是10秒之后才到的", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration(10000 "");
return message;
}
});
System.out.println("我发送消息的时间为:" (System.currentTimeMillis()));
System.out.println("开始倒计时:10");
int i = 10;
while (true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(i>0){
System.out.println("倒计时:" (--i));
}
}
}
- 消费者监听器编写
@RabbitListener(queues = "queue_test_dlx")
public void onMessage5(Message message, Channel channel) throws Exception {
System.out.println("我收到消息的时间为:" (System.currentTimeMillis()));
System.out.println("收到消息来自队列queue_test_dlx:" new String(message.getBody()));
}
总结
到目前为止。延时队列已经实现完成,我们现在来总结下这种方式实现延时队列的唯一缺点:
「不及时」:因为只有消息到达了队列顶部,broker才会去检查消息是否过期,进行推送,加入设置了过期时间的消息前面有一个设置了更长时间过期时间的消息,这样会导致过期时间小的消息一直没有被处理掉,一直在队列中等待;
因为这个原因,rabbitmq引入了一个延时队列插件,这个插件的实现思路和前面的实现方式不同,当给一个消息设置了延迟时间后,它并不会立即把消息推送到队列,而是等消息过了设置的延时时间后才放到队列中,我们现在介绍下延时队列插件是如何实现的:
使用延时队列插件
安装延时队列插件
代码语言:javascript复制#下载插件 https://www.cnblogs.com/geekdc/p/13549613.html
docker cp /Users/yangle/docker/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/plugins
#进入容器
docker exec -it rabbitmq /bin/bash
#启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#查看
rabbitmq-plugins list
#重新启动容器
docker restart rabbitmq
代码编写
- 交换机与队列绑定配置文件
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_DELAY_PLUGIN = "queue_test_delay_plugin";
public static final String EXCHANGE_TEST_DELAY_PLUGIN = "exchange_test_delay_plugin";
// 声明一个队列
@Bean("queueDelayPlugin")
public Queue queueDelayPlugin() {
return QueueBuilder.durable(QUEUE_TEST_DELAY_PLUGIN).build();
}
@Bean
CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>();
// 设置为路由模式
args.put("x-delayed-type", "direct");
// type必须设置为x-delayed-message
return new CustomExchange(EXCHANGE_TEST_DELAY_PLUGIN, "x-delayed-message", true,false, args);
}
// 插件交换机与队列绑定
@Bean
public Binding itemQueueExchange8(@Qualifier("queueDelayPlugin") Queue queue,
@Qualifier("delayExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testDelayPlugin").noargs();
}}
- 发送消息
@Test
public void testDelayPlugin() {
rabbitTemplate.convertAndSend("exchange_test_delay_plugin", "testDelayPlugin", "测试延时插件发送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
});
}
- 监听器
@RabbitListener(queues = "queue_test_delay_plugin")
public void onMessage6(Message message, Channel channel) throws Exception {
System.out.println("我收到消息的时间为:" (System.currentTimeMillis()));
System.out.println("收到消息来自队列queue_test_delay_plugin:" new String(message.getBody()));
}
总结
虽然说插件实现延迟队列的方式简单,但是它也有他的局限性:
- 会降低性能,所以如果没有该需求,则不要使用。
- 该插件不适合大数据量的延时消息,比如百万或一亿。
- 延时时长:0<=n<=(2^32)-1 ,单位毫秒。