Rabbitmq 通过延迟插件实现延迟队列
文章目录
- DLX TTL 存在时序问题
- 安装延迟插件
- 下载地址
- 安装
- Java 代码实现
DLX TTL 存在时序问题
由于队列先入先出的特性. 通过死信队列(DLX)和给每条消息设置过期时间(TTL)来实现延迟队列, 会存在时序问题. 即排在队列头的消息过期使时间如果设置的比较长, 会导致队列后面过期时间比较短的消息, 过期了迟迟不被消费掉. 可以通过给 Rabbitmq 安装延迟插件来实现延迟队列功能
安装延迟插件
下载地址
rabbitmq-delayed-message-exchange 插件可到这里下载: RabbitMQ 延迟插件
也可以到github上下载 : RabbitMQ Delayed Message Plugin
(注意插件版本, 这个插件适应的版本时 3.5.8 及其以后的版本)
安装
登录 Linux 服务器, 将插件复制到这个路径下: /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins/
然后执行以下指令:
代码语言:javascript复制# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启 rabbitmq
/sbin/service rabbitmq-server restart
# 查看插件是否安装成功
sudo rabbitmq-plugins list
Java 代码实现
代码语言:javascript复制@Configuration
public class RabbitConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@PostConstruct
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
TopicExchange exchange = new TopicExchange("exchange.delay");
// 交换器设置延迟属性
exchange.setDelayed(true);
rabbitAdmin.declareQueue(new Queue("queue.delay"));
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.delay")).to(exchange).with("rountingkey.delay");
return rabbitAdmin;
}
}
// 消息发送器
public class SendMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送消息
public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
// 给每条消息设置过期时间
message.getMessageProperties().setExpiration(expirationTime);
return message;
});
}
}
// 消息监听器, 交换器 delayed = "true"
@Component
@RabbitListener(containerFactory = "listenerContainerFactory",
bindings = @QueueBinding(value = @Queue(value = "queue.delay"),
exchange = @Exchange(value = "exchange.delay", type = ExchangeTypes.TOPIC, delayed = "true"),
key = "rountingkey.delay"))
@Slf4j
public class MsgListener {
@RabbitHandler
public void msgHandler(String msg) {
log.info("接收到的延迟消息 [{}]",msg)
}
}