RabbitMQ死信队列在SpringBoot中的使用

2020-06-28 10:44:30 浏览数 (1)

死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。# 概念:

  • 消息会变成死信消息的场景:
    1. 消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝签收,并且重新入队为false。 1.1 有一种场景需要注意下:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject
    2. 消息过期,过了TTL存活时间。
    3. 队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。
  • 代码编写流程是:
    1. 有一个(n个)正常业务的Exchange,比如为user-exchange
    2. 有一个(n个)正常业务的Queue,比如为user-queue。(因为该队列需要绑定死信交换机,所以需要加俩参数:死信交换机:x-dead-letter-exchange,死信消息路由键:x-dead-letter-routing-key
    3. 进行正常业务的交换机和队列绑定。
    4. 定义一个死信交换机,比如为common-dead-letter-exchange
    5. 将正常业务的队列绑定到死信交换机(队列设置了x-dead-letter-exchange即会自动绑定)。
    6. 定义死信队列user-dead-letter-queue用于接收死信消息,绑定死信交换机。
  • 业务流程是:
    1. 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。
    2. 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键)。
    3. 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。
    4. 消息到达死信队列后,可监听该死信队列,处理死信消息。
  • 死信交换机死信队列也是普通的交换机和队列,只不过是我们人为的将某个交换机和队列来处理死信消息。
  • 流程图
图片.png图片.png

# 代码实现

  1. 配置spring: application: name: learn-rabbitmq rabbitmq: host: localhost port: 5672 username: futao password: 123456789 virtual-host: deadletter-vh connection-timeout: 15000 # 发送确认 publisher-confirms: true # 路由失败回调 publisher-returns: true template: # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃 mandatory: true listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 签收模式为手动签收-那么需要在代码中手动ACK acknowledge-mode: manual app: rabbitmq: # 队列定义 queue: # 正常业务队列 user: user-queue # 死信队列 user-dead-letter: user-dead-letter-queue # 交换机定义 exchange: # 正常业务交换机 user: user-exchange # 死信交换机 common-dead-letter: common-dead-letter-exchange
  2. 队列、交换机定义与绑定。/** * 队列与交换机定义与绑定 * * @author futao * @date 2020/4/7. */ @Configuration public class Declare { /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") .build(); } /** * 用户交换机 * * @param userExchangeName 用户交换机名 * @return */ @Bean public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) { return ExchangeBuilder .topicExchange(userExchangeName) .durable(true) .build(); } /** * 用户队列与交换机绑定 * * @param userQueue 用户队列名 * @param userExchange 用户交换机名 * @return */ @Bean public Binding userBinding(Queue userQueue, Exchange userExchange) { return BindingBuilder .bind(userQueue) .to(userExchange) .with("user.*") .noargs(); } /** * 死信交换机 * * @param commonDeadLetterExchange 通用死信交换机名 * @return */ @Bean public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return ExchangeBuilder .topicExchange(commonDeadLetterExchange) .durable(true) .build(); } /** * 用户队列的死信消息 路由的队列 * 用户队列user-queue的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列 * 用这个队列来接收user-queue的死信消息 * * @return */ @Bean public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) { return QueueBuilder .durable(userDeadLetterQueue) .build(); } /** * 死信队列绑定死信交换机 * * @param userDeadLetterQueue user-queue对应的死信队列 * @param commonDeadLetterExchange 通用死信交换机 * @return */ @Bean public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) { return BindingBuilder .bind(userDeadLetterQueue) .to(commonDeadLetterExchange) .with("user-dead-letter-routing-key") .noargs(); } }
  • 定义好之后启动程序,springboot会读取Spring容器中类型为Queue和Exchange的bean进行队列和交换机的初始化与绑定。当然也可以自己在RabbitMQ的管理后台进行手动创建与绑定。
  • 查看管理后台 交换机交换机 队列队列 队列与死信交换机队列与死信交换机

# 测试

  • 消息生产者/** * @author futao * @date 2020/4/7. */ @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; @Value("${app.rabbitmq.exchange.user}") private String userExchange; public void send() { User user = User.builder() .userName("天文") .address("浙江杭州") .birthday(LocalDate.now(ZoneOffset.ofHours(8))) .build(); rabbitTemplate.convertAndSend(userExchange, "user.abc", user); } }1. 场景1.1消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。

nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以.

  • 消费者代码/** * @author futao * @date 2020/4/9. */ @Slf4j @Component public class Consumer { /** * 正常用户队列消息监听消费者 * * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user, Message message, Channel channel) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); try { //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("消息拒绝签收失败", e); } } /** * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user, Message message, Channel channel) { log.info("接收到死信消息:[{}]", JSON.toJSONString(user)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("死信队列消息签收失败", e); } } }
  • 可以看到,正常消息被NACK之后最后到了死信队列,且路由键发生了变化。 死信消息死信消息 1. 场景1.2消费者设置了自动签收,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject
  • application.yml中需要更改一些配置spring: application: name: learn-rabbitmq rabbitmq: listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 自动签收 acknowledge-mode: auto retry: enabled: true # 第一次尝试时间间隔 initial-interval: 10S # 两次尝试之间的最长持续时间。 max-interval: 10S # 最大重试次数(=第一次正常投递1 重试次数4) max-attempts: 5 # 上一次重试时间的乘数 multiplier: 1.0
  • 消费者代码/** * @author futao * @date 2020/4/9. */ @Slf4j @Configuration public class AutoAckConsumer { /** * 正常用户队列消息监听消费者 * * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); throw new RuntimeException("模拟发生异常"); } /** * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user) { log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user)); } }
  • 测试结果: image.pngimage.png image.pngimage.png
  • 从测试结果可以看出,消息如果未被正常消费,则进行重试,如果最终还未被正常消费,则会被投递到死信队列。

initial-interval,max-interval这两个参数啥作用不知道,现在测试的结果是一直都会取最短的那个时间作为下次投递时间...

2. 测试场景 2

消息过期,过了TTL存活时间。需要修改队列定义,设置队列消息的过期时间x-message-ttl. /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") //该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列 .withArgument("x-message-ttl", 5000) .build(); }把user-queue的消费者注释,使消息无法被消费,直到消息在队列中的时间达到设定的存活时间。 ttlttl 根据日志可以看到,消息在5S后会被投递到死信队列。 image.pngimage.png

  • 注意:可以给队列设置消息过期时间,那么所有投递到这个队列的消息都自动具有这个属性。还可以在消息投递之前,给每条消息设定指定的过期时间。(当两者都设置了,则默认取较短的值)

下面测试给每条消息设置指定的过期时间:

  • 修改消息生产者:/** * @author futao * @date 2020/4/7. */ @Slf4j @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; @Value("${app.rabbitmq.exchange.user}") private String userExchange; public void send(String exp) { User user = User.builder() .userName("天文") .address("浙江杭州") .birthday(LocalDate.now(ZoneOffset.ofHours(8))) .build(); log.info("消息投递...指定的存活时长为:[{}]ms", exp); rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); //为每条消息设定过期时间 messageProperties.setExpiration(exp); return message; } }); } }image.pngimage.png
  • 从测试结果可以看出,每条消息都在指定的时间投递到了死信队列。

【坑】重点注意!!!:RabbitMQ对于消息过期的检测:只会检测最近将要被消费的那条消息是否到达了过期时间,不会检测非末端消息是否过期。造成的问题是:非末端消息已经过期了,但是因为末端消息还未过期,非末端消息处于阻塞状态,所以非末端消息不会被检测到已经过期。使业务产生与预期严重不一致的结果。

  • 对上面的问题进行测试:(第一条消息的过期时间设置成10S,第二条消息设置成5S) image.pngimage.png
  • 从测试结果可以看出,id为1的消息存活时长为10S,id为2的消息存活时间为5S。但是只有当第一条消息(id=1)过期之后,id=2的消息到达队列末端,才会被检测到已经过期。

3. 测试场景3

队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。

  • 修改队列定义 /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") //队列最大消息数量 .withArgument("x-max-length", 2) .build(); }image.pngimage.png
  • 向队列中投递消息 image.pngimage.png
  • 从结果可以看出,当投递第3条消息的时候,RabbitMQ会把在最靠经被消费那一端的消息移出队列,并投递到死信队列。 image.pngimage.png 队列中将始终保持最多两个消息。

# 其他:

  • Queue的可配置项可在RabbitMQ的管理后台查看: image.pngimage.png
  • 源码:https://github.com/FutaoSmile/springboot-learn-integration/tree/master/springboot-learn-rabbitmq

# 相关:

[SpringBoot RabbitMQ实现消息可靠投递

](https://www.jianshu.com/p/432167bbe95f)

# TODO:

  • 消费端限流保护
  • 延迟队列

0 人点赞