Rabbitmq 通过延迟插件实现延迟队列

2022-05-05 14:56:03 浏览数 (1)

Rabbitmq 通过延迟插件实现延迟队列

文章目录

  • DLX TTL 存在时序问题
  • 安装延迟插件
    • 下载地址
    • 安装
  • Java 代码实现

DLX TTL 存在时序问题

由于队列先入先出的特性. 通过死信队列(DLX)和给每条消息设置过期时间(TTL)来实现延迟队列, 会存在时序问题. 即排在队列头的消息过期使时间如果设置的比较长, 会导致队列后面过期时间比较短的消息, 过期了迟迟不被消费掉. 可以通过给 Rabbitmq 安装延迟插件来实现延迟队列功能

安装延迟插件

下载地址

rabbitmq-delayed-message-exchange 插件可到这里下载: RabbitMQ 延迟插件

也可以到github上下载 : RabbitMQ Delayed Message Plugin

(注意插件版本, 这个插件适应的版本时 3.5.8 及其以后的版本)

安装

登录 Linux 服务器, 将插件复制到这个路径下: /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins/

然后执行以下指令:

代码语言:javascript复制
# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 重启 rabbitmq
/sbin/service rabbitmq-server restart

# 查看插件是否安装成功
sudo rabbitmq-plugins list

Java 代码实现

代码语言:javascript复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        TopicExchange exchange = new TopicExchange("exchange.delay");
        // 交换器设置延迟属性
        exchange.setDelayed(true);				                  
        rabbitAdmin.declareQueue(new Queue("queue.delay"));
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.delay")).to(exchange).with("rountingkey.delay");
        return rabbitAdmin;
    }
}

 // 消息发送器                                  
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 发送消息
    public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            // 给每条消息设置过期时间
            message.getMessageProperties().setExpiration(expirationTime);
            return message;
        });
    }
}

// 消息监听器, 交换器 delayed = "true"
@Component
@RabbitListener(containerFactory = "listenerContainerFactory",
    bindings = @QueueBinding(value = @Queue(value = "queue.delay"),
        exchange = @Exchange(value = "exchange.delay", type = ExchangeTypes.TOPIC, delayed = "true"),
        key = "rountingkey.delay"))
@Slf4j
public class MsgListener {

    @RabbitHandler
    public void msgHandler(String msg) {
        log.info("接收到的延迟消息 [{}]",msg)
    }
}                         

0 人点赞