rabbitmq死信队列详解与使用

2022-04-27 15:38:47 浏览数 (1)

什么是死信队列

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

以上是个人的通俗解释,专业术语解释的比较正规点大家可以参考,主要想搞清楚这个概念,不同的消息中间件大概都有自身对于死信或者死信队列的处理方式,下面重点要说说rabbitmq的死信队列

RabbitMQ的死信队列

对rabbitmq来说,产生死信的来源大致有如下几种:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

死信的处理方式

  • 死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种, 1丢弃,如果不是很重要,可以选择丢弃 2记录死信入库,然后做后续的业务分析或处理 3通过死信队列,由负责监听死信的应用程序进行处理
  • 综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,关于这一点,也是本篇要重点讲述的,下面将用代码演示一下死信的产生及路由,即上面提到的三种方式,网上可供参考的资料比较多,但大多不全面,下面提供比较完整的demo,将各种场景的产生和过程进行列举,

方式1:消息超时进入死信队列

这是一种在实际生产中应用场景比较多的一种方式,比如我们熟知的订单业务场景,当用户购买商品产生了一个订单的时候,可以设置过期时间,如果在这段时间内,消息还没有被消费,将会被路由到死信队列,专业术语来讲,即消息的TTL,TTL过期了消息将进入死信队列,下面是一段演示代码,这里包括两部分,生产者和消费者。

rabbitmq的死信队列设置主要在参数argument中做配置,这里需要设置的有 x-dead-letter-exchange 和 x-message-ttl

producer代码, 此处模拟生产者产生订单,推送到队列中,消息有效时间是10S,过了10S如果没有被消费将会被路由到死信队列

代码语言:javascript复制
public static void main(String[] args) throws Exception{

        final Channel channel = RabbitUtil.getChannel();

        String orderExchangeName = "order_exchange";
        String orderQueueName = "order_queue";
        String orderRoutingKey = "order.#";
        Map<String, Object> arguments = new HashMap<String, Object>(16);

        //死信队列配置  ----------------
        String dlxExchangeName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";
        String dlxRoutingKey = "#";

        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange",dlxExchangeName);
        // 设置队列中的消息 10s 钟后过期
        arguments.put("x-message-ttl", 10000);

        //正常的队列绑定
        channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
        channel.queueDeclare(orderQueueName, true, false, false, arguments);
        channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())   " 创建订单.";

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);

        channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        System.err.println("消息发送完成......");
    }

consumer代码,消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了

代码语言:javascript复制
public class Consumer {

    //消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了
    private static final String QUEUE_NAME = "dlx.queue";

    public static void main(String[] args) throws Exception{
        // 创建信道
        final Channel channel = RabbitUtil.getChannel();

        System.out.println("消费者启动 ..........");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("死信队列接收到消息:"   new String(body));
                System.err.println("deliveryTag:"   envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
        TimeUnit.SECONDS.sleep(10000000L);
    }

}

然后我们分别运行两端的代码,这里提示一下,我们并没有提前在控制台去创建queue 和 exchange,这个在producer启动或者consumer启动的时候,如果没有创建过会自动创建以及建立queue和exchange的绑定关系,

启动producer,消息发送成功,同时可以通过控制台看到,exhange和相关的队列也帮我们创建了,要注意的是在dlx.queue中,有一个消息就绪,很明显,消息过了10S中没有任何消费者消费,就被路由到了死信队列dlx.queue中,

启动consumer,通过控制台打印结果,可以看到,由于消费端监听的是死信队列,已经从dlx.queue中成功获取到了这条信息,

2、消息被拒绝,且requeue=false

没有细致研究过这个问题的可能会有点儿懵,其实就是在consumer端,当消费者要过滤某些消息的时候,那部分被过滤掉的消息如果不设置退回,即上一篇所讲的消息重回队列的话,这些消息就变成了死信,即在下面的代码中第三个参数设置成false即可,下面来看具体的代码,

channel.basicNack(envelope.getDeliveryTag(),false,false);

有这样一个场景,一批消息中,当消费端从header中收到了num=0的消息将会被过滤掉,并且设置如上requeue=false,下面看具体的代码,

peoducer端代码,

代码语言:javascript复制
/**
 * 生产者
 * 死信队列使用
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Channel channel = RabbitUtil.getChannel();
        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        //通过在properties设置来标识消息的相关属性
        for(int i=0;i<5;i  ){
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num",i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)                    // 传送方式 2:持久化投递
                    .contentEncoding("UTF-8")           // 编码方式
                    //.expiration("10000")              // 过期时间
                    .headers(headers)                  //自定义属性
                    .build();
            String message = "hello this is ack message ....."    i;
            System.out.println(message);
            channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes());
        }

    }


}

consumer端代码,

代码语言:javascript复制
public class Consumer {

    public static void main(String[] args) throws Exception{

        final Channel channel = RabbitUtil.getChannel();
        String exchangeName = "test_ack_exchange";
        String exchangeType="topic";
        final String queueName = "test_ack_queue";
        String routingKey = "ack.#";

        //死信队列配置  ----------------
        String deadExchangeName = "dead_exchange";
        String deadQueueName = "dead_queue";
        String deadRoutingKey = "#";
        //死信队列配置  ----------------

        //如果需要将死信消息路由
        Map<String,Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange",deadExchangeName);

        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        channel.queueDeclare(queueName,false,false,false,arguments);
        channel.queueBind(queueName,exchangeName,routingKey);

        //死信队列绑定配置  ----------------
        channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null);
        channel.queueDeclare(deadQueueName,true,false,false,null);
        channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey);
        //死信队列配置  ----------------

        System.out.println("consumer启动 .....");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try{
                    Thread.sleep(2000);
                }catch (Exception e){

                }
                Integer num = (Integer)properties.getHeaders().get("num");
                if(num==0){
                    //未被ack的消息,并且requeue=false。即nack的 消息不再被退回队列而成为死信队列
                    channel.basicNack(envelope.getDeliveryTag(),false,false);
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer端的Nack消息是: "   message);
                }else {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer端的ack消息是: "   message);
                }
            }
        };
        //消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收
        channel.basicConsume(queueName,false,consumer);
    }
}

要关注的即下面的这处代码和第三个参数,

启动生产者和消费者

启动生产者,生产者成功发送5条消息

再看消费端的控制台,这里num=0的这条消息由于设置了死信队列而不会重回原来的队列,在上一篇中,当参数设置成了true的时候,看到控制台一直会打印一条消息,

同时,通过控制台也可以发现,在dead_queue中,有一条消息为就绪状态了,即死信消息,但这里并没有对这条消息做处理,目前一直存在队列里面,可以根据实际应用做后续的处理,

3、队列达到最大长度

这个很好理解,比如我们设置某个队列的最大可承载消息的数量是100个,超出第100个的消息将会被路由到死信队列中,设置消息队列的最大数量也是实际生产中作为队列限流的一种常规手段,具有实际的业务意义,下面是代码演示,基本设置和上述的TTL类似,只是在参数中将TTL更换为如下配置,

arguments.put("x-max-length",3);

生产者代码,这里我们设定order_queue这个队列的容量是5个,但是我们在程序中设置的x-max-length=3,那么按照这个猜想,将会有两个消息被路由到死信队列

代码语言:javascript复制
public class Producer {

    public static void main(String[] args) throws Exception{

        final Channel channel = RabbitUtil.getChannel();
        String orderExchangeName = "order_exchange";
        String orderQueueName = "order_queue";
        String orderRoutingKey = "order.#";
        Map<String, Object> arguments = new HashMap<String, Object>(16);

        //死信队列配置  ----------------
        String dlxExchangeName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";
        String dlxRoutingKey = "#";

        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange",dlxExchangeName);
        // 设置队列中的消息 10s 钟后过期
        //arguments.put("x-message-ttl", 10000);
        arguments.put("x-max-length",3);

        //正常的队列绑定
        channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
        channel.queueDeclare(orderQueueName, true, false, false, arguments);
        channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())   " 创建订单.";

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);

        for(int i=0;i<5;i  ){
            message = message   "========> "   i ;
            System.out.println("发送的消息是:"   message);
            channel.basicPublish(orderExchangeName, "order.save",null, message.getBytes());
        }

        System.err.println("消息发送完成......");
    }

}

消费者代码

代码语言:javascript复制
public class Consumer {

    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] args) throws Exception{
        // 创建信道
        final Channel channel = RabbitUtil.getChannel();
        // 消费端消息限流。
        // 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收  此参数才会生效。
        //channel.basicQos(1);

        System.out.println("消费者启动 ..........");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("死信队列接收到消息:"   new String(body));
                System.err.println("deliveryTag:"   envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false, consumer);
        //TimeUnit.SECONDS.sleep(10000000L);
    }

}

启动生产者,5条消息发送完毕

再启动消费端,通过控制台可以看到,消费端只从order_queue中消费了3条消息,还剩2条消息去哪里了呢?

我们再回到控制台观察一下,发现在dlx.queue这个死信队列中有两条就绪的消息,即剩下的2条消息被路由到了死信队列了

以上便是关于死信队列常见的3种方式的处理程序和逻辑

0 人点赞