9 total views, 3 views today
RabbitMQ 实现延迟队列,方法有两种。
第一种是安装延迟队列插件;
第二种就是利用死信队列的方式;
这里采用第二种方式。
rabbitmq 自身的一些概念,可以去网上或者书上获得。rabbitmq 延迟队列的实现原理,网上资料很多,简单盗图一张。
简单说明一下原理。
将消息发送到一个队列中去,消息自身有一个 TTL,即失效时间,如果到期还是为消费该消息,那么该消息就成为死信,将死信移到专门的死性队列,然后消费者只需要消费死信队列中的消息,变相的实现了延迟消息的功能。
代码语言:javascript复制基于 Springboot 的具体代码实现:
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件:
代码语言:javascript复制spring:
rabbitmq:
host: 111.22.1.2
port: 5672
username: user
password: passw
template:
mandatory: true
#支持发布确认与返回
publisher-confirms: true
publisher-returns: true
listener:
simple:
#是否自动开始监听消息队列
auto-startup: false
#手动应答
acknowledge-mode: manual
#监听容器数及最大数
concurrency: 1
max-concurrency: 1
#是否支持重试
retry:
enabled: true
配置文件 TopicRabbitConfig.Java
代码语言:javascript复制package com.mine.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TopicRabbitConfig {
//延迟队列的名字
public static final String delayQueueName = "delay_machine_check_queue";
//延迟队列的exchange名字
public static final String delayExchangeName = "delay_machine_check_exchange";
//死信exchange的名称
public static final String deadLetterProcessQueueName = "dead_letter_process_queue";
//死信exchange的名称
public static final String deadLetterProcessExchangeName = "dead_letter_machine_check_exchange";
public static final String routingKey = "machine_check";
//延迟队列的exchange
@Bean
public TopicExchange delayExchange() {
return new TopicExchange(delayExchangeName, true, false);
}
//死信队列的exchange
@Bean
public TopicExchange deadLetterProcessExchange() {
return new TopicExchange(deadLetterProcessExchangeName, true, false);
}
//延迟队列
@Bean
Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
//args.put("x-message-ttl", 20000);
args.put("x-dead-letter-exchange", deadLetterProcessExchangeName); //DLX,dead letter发送到的exchange
args.put("x-dead-letter-routing-key", routingKey);
return new Queue(delayQueueName, true, false, false, args);
}
//死信队列
@Bean
public Queue deadLetterProcessQueue() {
return new Queue(deadLetterProcessQueueName, true, false, false);
}
//绑定延迟队列,延迟Exchange,routing关系
@Bean
Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(routingKey);
}
//绑定死信队列,死信Exchange,routing关系
//参数根据Spring命名约定的方式,会将上面的Queue实例和exchange实例注入进来,形成绑定关系
@Bean
Binding bindingDeadLetterProcessExchange(Queue deadLetterProcessQueue, TopicExchange deadLetterProcessExchange) {
return BindingBuilder.bind(deadLetterProcessQueue).to(deadLetterProcessExchange).with(routingKey);
}
}
监听代码:
代码语言:javascript复制//只监听听死信队列即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = TopicRabbitConfig.deadLetterProcessQueueName, durable = "true", autoDelete = "false"),
exchange = @Exchange(name = TopicRabbitConfig.deadLetterProcessExchangeName, durable = "true", type = "topic", autoDelete = "false"),
key = TopicRabbitConfig.routingKey), autoStartup = "true", id = "myconsumer")
public void receiveRabbitMQ(Message message, Channel channel) throws Exception {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
LogHelper.log4j2.info("收到消息:" new String(message.getBody()));
} catch (Exception ex) {
LogHelper.log4j2.error("receive", ex);
}
}
上面 2 段代码,实现的效果就是如下:
发送消息的代码:RabbitMQTopicSender.java
代码语言:javascript复制package com.mine.utils;
import com.mine.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
public class RabbitMQTopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message, Long delaySecond) {
/***
* 方法参数说明
* https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html
convertAndSend(String exchange, String routingKey, Object object)
Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
***/
message = message "【" LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) "】";
LogHelper.log4j2.info("message:" message);
MessageProperties props = new MessageProperties();
props.setExpiration(Long.toString(delaySecond * 1000));//消息的延迟时间
Message ttlMessage = new Message(message.getBytes(), props);
rabbitTemplate.convertAndSend(TopicRabbitConfig.delayExchangeName, TopicRabbitConfig.routingKey, ttlMessage);
LogHelper.log4j2.info("消息发送成功");
}
}
至此调用 send 方法,即可发送延迟队列。
以上代码亲测有效。
原创文章,转载请注明出处!https://cloud.tencent.com/developer/article/1618510