1. 概述
rabbitmq 是目前使用最为普及的消息队列组件,基于 AMQP 的 rabbitmq 在各方面设计都比较完善,同时,它具有非常丰富的功能与特性,可以支持各种实际的适用场景。 但是 rabbitmq 本身并不直接支持延时队列的功能,本文我们就来介绍一下,如何通过 rabbitmq 的特性实现一个延时队列。
2. 延时队列的简易实现
延时队列就是只有当消息在队列中存放达到指定的时间后,才可以被消费,他的应用场景通常并不多,但在此前我们介绍的秒杀系统中非常常用。 当用户提交订单指定时间后没有支付,那么用户的订单应该被取消以便其他用户可以继续抢购,这样的情况下,延时队列就非常有必要了。
之前的文章中,我们使用 redis 集群来实现了这个功能,redis 中存储了下单时间,以分钟为粒度扫描相应的 key,即可扫出所有下单时间超过指定时间间隔的数据。 rabbitmq 也可以依照上述理论,定时取出所有消息,时间间隔不足的则放回队列。 这样的方法优势在于实现简单,但是显然性能较低,虽然 rabbitmq 不支持延时队列的功能,但是我们依然可以借用 rabbitmq 的消息过期机制与失效消息转发机制来实现我们需要的延时队列功能。
3. rabbitmq 与消息过期时间 — TTL
3.1. 为队列设置消息过期时间
rabbitmq 支持在创建队列时对队列设置消息过期时间:
代码语言:javascript复制Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
也可以通过 rabbitmqctl 命令对符合条件的队列设置消息过期时间规则:
代码语言:javascript复制rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
<code>
<h3>为消息设置消息过期时间
同时,rabbitmq 也支持设置单条消息的过期时间:
<code mode="java">
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
如果同时设置了队列和消息的过期时间,那么消息实际的过期时间将会是两个设置值的较小值。
4. 失效消息转发队列 — DLX
一旦上述消息过期时间设置生效,某条消息达到消息过期时间,那么他将会成为一条“dead-lettered”,此外,被拒绝的消息如果 requeue 属性为 false,或者消息所在队列已达到最大长度,那么他也将成为“dead-lettered”。 如果我们设置了 DLX 规则,即失效消息转发规则,那么失效的消息就会被转发到相应的 exchange 和 queue。
4.1. 通过代码设置失效消息转发队列
我们可以通过下列代码进行设置:
代码语言:javascript复制channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
这样,一旦消息失效,则消息会被自动转发到你设置的 x-dead-letter-exchange 上的同名队列。 你也可以通过下面的代码指定具体转发的目标 routing-key:
代码语言:javascript复制args.put("x-dead-letter-routing-key", "some-routing-key");
4.2. 通过 rabbitmqctl 命令设置失效消息转发队列
同样你也可以通过 rabbitmqctl 命令设置失效消息转发队列:
代码语言:javascript复制rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
如果你需要指定转发的具体消息队列,你需要为消息指定 x-dead-letter-routing-key 属性。
5. 综述
进行了上述设置以后,消息就会在你指定的延时时间后自动被转发到相应的消息队列中,你需要做的就是去转发后的目标队列中实时取出消息,一个延时队列就这样应运而生了。
6. 参考文献
RabbitMQ TTL — http://www.rabbitmq.com/ttl.html。 RabbitMQ DLX — http://www.rabbitmq.com/dlx.html。