Rabbitmq 通过死信队列实现延迟消息发送
文章目录
- 设置消息的过期时间(TTL)
- 两种方法设置 TTL
- Java 代码实现
- 给队列设置 TTL
- 给每一个消息单独设置 TTL
- 死信队列
- 延迟队列
- 实现方法
- Java 代码
- 缺点
设置消息的过期时间(TTL)
TTL, Time to Live 的简称, 即过期时间.
两种方法设置 TTL
- 通过队列属性设置. 即队列中所有的消息都有相同的过期时间. 在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现, 单位是毫秒
- 对消息本身进行单独设置. 即每条消息的 TTL 可以不同. 在 channel.basicPublish 方法中加入 expiration 属性, 单位是毫秒 总结: 如果两种方法一起使用, 则以较小的那个 TTL 为准. 消息过期后, 消费者无法再接收该消息, 就会变成死信(Dead Message). 这个特性可以实现延迟队列功能
Java 代码实现
给队列设置 TTL
代码语言:javascript复制@Configuration
public class RabbitConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@PostConstruct
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
Map<String, Object> argMap = new HashMap<>();
argMap.put("x-message-ttl", 2000);// 2s 过期
Queue queue = new Queue("queue.expiration", true, false, false, argMap);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(
BindingBuilder.bind(queue).to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
return rabbitAdmin;
}
}
@Service
public class SendMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String msg) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
}
给每一个消息单独设置 TTL
代码语言:javascript复制@Configuration
public class RabbitConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@PostConstruct
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
rabbitAdmin.declareQueue(new Queue("queue.expiration"));
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.expiration"))
.to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
return rabbitAdmin;
}
}
@Service
public class SendMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
message.getMessageProperties().setExpiration(expirationTime);
return message;
});
}
}
死信队列
DLX(Dead-Letter-Exchange), 可以称之为死信交换器, 也成死信信箱. 当消息在一个队列中变成死信(dead message) 后, 会被重新发送到另外一个交换器中, 这个交换器就是 DLX. 绑定了 DLX 的队列就是死信队列. 说白了就是, 有两个队列, 一个队列上的消息设置了过期时间, 但没有消费者. 另一个队列是普通队列, 有消费者. 后者被称为死信队列. 当前一个队列消息过期后, Rabbitmq 会自动将过期消息转发到死信队列里. 然后被死信队列的消费者消费掉. 实现消息的延迟发送功能
延迟队列
延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行
实现方法
通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数为这个队列添加 DLX
Java 代码
代码语言:javascript复制@Configuration
public class RabbitConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@PostConstruct
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
// 死信队列
rabbitAdmin.declareExchange(new DirectExchange("exchange.dlx"));
rabbitAdmin.declareQueue(new Queue("queue.dlx"));
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("queue.dlx")).to(new DirectExchange("exchange.dlx")).with("routingkey.dlx"));
// 延迟队列
Map<String, Object> argMap = new HashMap<>(3);
argMap.put("x-message-ttl", 2000);// 2s 过期
argMap.put("x-dead-letter-exchange", "exchange.dlx");
argMap.put("x-dead-letter-routing-key", "routingkey.dlx");
rabbitAdmin.declareQueue(new Queue("queue.normal", true, false, false, argMap));
rabbitAdmin.declareExchange(new TopicExchange("exchange.normal"));
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.normal", true, false, false, argMap))
.to(new TopicExchange("exchange.normal")).with("queue.normal"));
}
}
缺点
使用死信队列来实现消息的延迟发送. 如果是采用第一种方式, 即每个队列设置相同的过期时间, 可以很好的实现消息的延迟发送功能. 如果采用第二种方式, 给每个消息设置不同的过期时间, 由于队列先入先出的特性, 如果队列头的消息过期时间很长, 后面的消息过期时间很短, 会导致后面的消息过期后不能及时被消费掉
简单的做法时, 使用 rabbitmq 的延迟插件: Rabbitmq 通过延迟插件实现延迟队列