RabbitMQ之死信队列解读

2023-10-12 07:46:46 浏览数 (1)

基本介绍

什么是死信交换机

在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。

什么是死信队列

死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。

要注意的是,DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

消息进入到死信队列的情况

消息过期

代码语言:javascript复制
MessageProperties messageProperties=new MessageProperties();
//设置此条消息的过期时间为10秒
messageProperties.setExpiration("10000");

队列过期

代码语言:javascript复制
 Map<String, Object> arguments =new HashMap<>();
 //指定死信交换机,通过x-dead-letter-exchange 来设置
 arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
 //设置死信路由key,value 为死信交换机和死信队列绑定的key
 arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
 //队列的过期时间
 arguments.put("x-message-ttl",10000);
return  new Queue(QUEUE_NORMAL,true,false,false,arguments);

TTL: Time to Live的简称,过期时间

队列达到最大长度(先入队的消息会被发送到DLX)

代码语言:javascript复制
Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列的最大长度 ,对头的消息会被挤出变成死信
arguments.put("x-max-length", 5);

消费者拒绝消息不进行重新投递

从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

application.yml 启动手动确认

代码语言:javascript复制
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
代码语言:javascript复制
 /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     *
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE})
    public void process(Message message, Channel channel) {
        System.out.println("接收到的消息:"   message);
        //对消息不确认, ack单词是 确认 的意思
       
        try {
            System.out.println("deliveryTag = "   message.getMessageProperties().getDeliveryTag());
            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:消息的一个数字标签
  • multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
  • requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列

消费者拒绝消息

开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列

代码语言:javascript复制
  /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     *
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE})
    public void process(Message message, Channel channel) {
        System.out.println("接收到的消息:"   message);
        try {
            System.out.println("deliveryTag = "   message.getMessageProperties().getDeliveryTag());
            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

  • basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息
  • 参数1: 消费消息的index
  • 参数2: 对异常消息的处理,true表示重排序,false表示丢弃
  • Reject 在拒绝消息时,可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
  • Nack 跟 Reject 类似,只是它可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。
  • 通过 RejectRequeuConsumer 可以看到无论是使用 Reject 方式还是 Nack 方式,当 requeue
  • 参数设置为 true 时,消息发生了重新投递。当 requeue 参数设置为 false 时,消息丢失了。

springboot代码实战

实战架构

​编辑

如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息变为死消息以后则会转发到与正常队列绑定的死信交换机中,死信交换机会转发到与其绑定的死信队列queue.deal.a。

工程概述

工程采用springboot架构,主要用到的依赖为:

代码语言:javascript复制
<!--        rabbit的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

application.yml配置文件如下:

代码语言:javascript复制
server:
  port: 8080
spring:
  rabbitmq:
    host: 123.249.70.148
    port: 5673
    username: admin
    password: 123456
    virtual-host: /

RabbitConfigDeal 配置类:创建队列及交换机并进行绑定

代码语言:javascript复制
@Configuration
public class RabbitConfigDeal {


}

创建正常交换机

代码语言:javascript复制
   @Bean
    public DirectExchange normalExchange(){
        return ExchangeBuilder.directExchange("exchange.normal.a").build();
    }

创建死信交换机

代码语言:javascript复制
    @Bean
    public DirectExchange deadExchange(){
        return ExchangeBuilder.directExchange("exchange.dead.a").build();
    }

创建死信队列

代码语言:javascript复制
    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable("queue.dead.a").build();
    }

创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order

代码语言:javascript复制
    @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
        arguments.put("x-message-ttl",20000);
        arguments.put("x-dead-letter-exchange","exchange.dead.a");
        arguments.put("x-dead-letter-routing-key","order");
        return QueueBuilder.durable("queue.normal.a")
                .withArguments(arguments).build();
    }

绑定正常交换机和正常队列

代码语言:javascript复制
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
    }

绑定死信交换机和死信队列

代码语言:javascript复制
    @Bean
    public Binding bindingDeal(DirectExchange deadExchange,Queue deadQueue){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("order");
    }

MessageService业务类:发送消息及接收消息

代码语言:javascript复制
@Component
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

}

发送消息方法

代码语言:javascript复制
    public void sendMsg(){
        //添加消息属性
        Message message = MessageBuilder.withBody("hello word!".getBytes(StandardCharsets.UTF_8))
               .build();
        rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
        log.info("发送消息时间:{}",new Date());
    }

这里用的路由key为info

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

接受消息

代码语言:javascript复制
    @RabbitListener(queues = {"queue.dead.a"})
    public void receiveMsg(Message message){
        byte[] body = message.getBody();
        String queue = message.getMessageProperties().getConsumerQueue();
        String msg=new String(body);
        log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    }

Message 在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性
  2. byte[] body // 消息内容

@RabbitListener 使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
    • application/octet-stream:二进制字节数组存储,使用 byte[]
    • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
    • text/plain:文本数据类型存储,使用 String
    • application/json:JSON 格式,使用 Object、相应类型

主启动类RabbitMq01Application:实现ApplicationRunner接口

代码语言:javascript复制
/**
 * @author 风轻云淡
 */
@SpringBootApplication
public class RabbitMq01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMq01Application.class, args);
    }

    @Resource
    private MessageService messageService;

    /**
     * 程序一启动就会调用该方法
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();

    }
}

在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。 由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

启动主启动类后查看控制台:

代码语言:javascript复制
2023-09-28 10:46:17.772  INFO 71700 --- [           main]
 c.e.rabbitmq01.service.MessageService    :
 发送消息时间:Thu Sep 28 10:46:17 CST 2023
2023-09-28 10:46:37.824  INFO 71700 --- [ntContainer#0-1] 
c.e.rabbitmq01.service.MessageService    : 
queue.dead.a接收到消息时间:Thu Sep 28 10:46:37 CST 2023,消息为hello word!

我们在这里可以看见17s的时候发送了消息,在经过了20s,即到37s的时候我们在死信队列queue.dead.a接受到了消息。

​​​​​我正在参与2023腾讯技术创作特训营第二期有奖征文,瓜分万元奖池和键盘手表

0 人点赞