RabbitMQ死信队列机制(七)

2022-03-29 16:05:37 浏览数 (1)

RabbitMQ的重回队列解决了RabbitMQ由于异常情况导致消息收不到的原因,但是一般在企业不怎么实用重回队列,更多使用的是死信队列的机制,这样来保障消费端能够接收到具体的消息,其实本质上都是为了消息消费者这层的可靠性的保障机制。

一、什么是死信队列

死信队列全名称是Dead Letter Exchange,所以私信队列简称是DLX,当生产者发送一个消息后,消费端未接收到,那么这个消息就会到死信队列中来保障消息的消费。在RabbitMQ的消息消费的处理机制中,当队列中存在死信时,RabbitMQ就会自动的切换到重新发布的Exchange中,从而到新的队列机制来保障消费者这边消费数据。

二、死信队列使用场景

一般而言写代码都是需要私信队列的情况,那么到底是什么情况是需要考虑死信队列的情况了?主要是如下几个场景中需要考虑死信队列的情况,具体汇总如下:

  • 消费端需要接收的消息被拒绝接收(这种拒绝是非资源的一种行为)
  • 消息的生存时间在过期的情况下
  • 队列达到最大的长度的情况下

三、死信队列使用方式

在使用死信队列的时候,需要做的就是设置死信队列的Exchange和Queue,其实死信队列可以更加简单的一种方式就是本应该正常的情况下发送消息到A中,当然这过程中A的Exchange和Queue是没有任何的问题的,只不过在消息发送的过程中,可能由于A的消息队列达到最大,或者是TTL过期,以及消费端被拒绝接收消息,那么在这种情况下,就会把消费端需要接收的消息切换到B中,那么这个B就是死信队列,当然这中间死信队列的Exchange和Queue它的映射关系是没有任何的问题。

四、死信队列案例实战

4.1、生产者案例代码

代码语言:javascript复制
package com.example.rabbitmq.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerDlx
{
    private  static  final  String exchangeName="test_dlx_exchange";
    private  static  final  String routyKey="dlx.save";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String msg = "Hello RabbitMQ  Message";

        for(int i=0;i<3;i  )
        {

            AMQP.BasicProperties properties= new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .build();

            channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
        }

    }
}

4.2、消费者案例代码

代码语言:javascript复制
package com.example.rabbitmq.dlx;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ConsumerDlx
{
    private static final String EXCHANGE = "test_dlx_exchange";
    private  static  final String queueName="test_dlx_queue";
    private  static  final  String routingKey="dlx.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();
            channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);

            Map<String,Object> arguments=new HashMap<String,Object>();

            //要进行死信队列的申明
            arguments.put("x-dead-letter-exchange","dlx.exchange");

            //这个arguments属性,要设置到申明队列上
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,EXCHANGE,routingKey);
            channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
            channel.queueDeclare("dlx.queue",true,false,false,null);
            channel.queueBind("dlx.queue","dlx.exchange","#"); 
            channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在使用死信队列的机制中,我们一定得申明死信队列的机制,也就是代码:

代码语言:javascript复制
arguments.put("x-dead-letter-exchange","dlx.exchange");

那么正常的情况下,本应该是把消息发送到test_dlx_exchange的Exchange中,然后发送到test_dlx_queue的队列中,但是这仅仅是正常的情况,在异常的情况下,就会把消息发送到死信队列了,也就是发送到到新的Exchange,该Exchange就是申明的dlx.exchange,此时消息也会发送到dlx.queue的队列中。

4.3、MyConsumer类代码

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------n");
        System.err.println("consumerTag:" consumerTag);
        System.err.println("envelope:" envelope);
        System.err.println("properties:" properties);
        System.err.println("the message received:" new String(body));

    }
}

4.4、死信队列演示

在如上的截图信息中,本来正常的情况下是会把消息给队列test_dlx_queue,我们也可以看到它是死信队列,但由于出现了异常的情况,最后我们可以看到消息从队列test_dlx_queue切换到dlx.queue,这样就能够即使在异常的情况下消息依然能够接收到。

0 人点赞