RabbitMQ生产者Confirm消息保障(四)

2022-03-29 16:02:59 浏览数 (1)

在RabbitMQ生产者Confirm消息中介绍了RabbitMQ生产者端的消息确认的机制,也就是在生产者端把消息发送成功后进行消息的应答机制,但是如果生产者端发送的消息根本没有发送成功了?那么针对这种情况也是需要一种对应的解决方案来进行处理。针对这种特殊的情况RabbitMQ提供了Return消息保障的机制。

一、什么是Return消息保障

当消息生产者端把消息发送到Exchange和RoutingKey的时候,然后Exchange与Queue会形成一定的映射机制来消息发送的消息,这是一个正常的MQ生产消费的机制。但是在某些特殊的情况下,生产者发送的消息到Exchnage,但是Exchange不存在,还有一种情况是RoutingKey路由不到,导致生产者发送的消息无法让消费者来进行消费,针对这种情况生产者需要监听不可达的消息机制,也就是ReturnListener。在RabbitMQ消息队列服务器中,专门有一个配置来解决这种情况,具体配置的参数就是mandatory,它是一个boolean,如果配置的是true,那么意味着监听器会接收到路由不可达的消息,然后进行后续逻辑上的处理,但是如果配置参数是false,那么MQ的Broker就会自动删除该MQ的message。

二、Return消息机制流程

通过如下交互图来呈现Return消息保障的机制,具体如下:

三、Return代码实现

3.1、生产者代码

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ProducerReturn
{
    private  static  final  String exchangeName="";
    private  static  final  String routyKey="return.save";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        String msg = "producer message return listener";
        channel.basicPublish(exchangeName, routyKey, null, msg.getBytes());

        /*
        * 添加发送消息前进行确认的机制,ReturnListener里面会提供一个接口的机制来进行保障
        * */
        channel.addReturnListener(new ReturnListener()
        {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println("-----------------handle return---------------------");
                System.out.println("replyCode:" replyCode);
                System.out.println("replyText:" replyText);
                System.out.println("exchange:" exchange);
                System.out.println("routingKey:" routingKey);
                System.out.println("properties:" properties);
                System.out.println("body:" new String(body));
            }
        });

        //使用mandtory的方式来发送消息,该参数设置为true,如果接收失败,那么就会输出接口监听到的信息
        channel.basicPublish(exchangeName,routyKey,true,null,msg.getBytes());
    }
}

备注:在如上监听的接口的方法中,replyCode值的是返回的状态码,replyText值的是返回的文本信息。这地方故意写的是Exchange为空。同时在basicPublish中,参数mandatory参数设置的是true。

3.2、消费者代码

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.*;

public class ConsumerReturn
{
    private static final String EXCHANGE = "test_return_exchange";
    private  static  final String queueName="test_return_queue";
    private  static  final  String routingKey="return.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();


            channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE,routingKey);


            // 监听队列,从队列中获取数据
            channel.basicConsume(queueName,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

备注:如上是消费者的代码,基本没多少区别的,只是把消费者接收消息的部分单独的分离了出来,分离出来的代码具体如下所示:

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("consumerTag:" consumerTag);
        System.err.println("envelope:" envelope);
        System.err.println("properties:" properties);
        System.err.println("the message received:" new String(body));

        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

3.3、执行结果

如上代码执行后,生产者端就会返回消息,也就是说在生产者发送消息的时候,找不到Exchange,这个时候消息肯定是无法发送的,我们开启了生产者端的监听,监听返回的信息如下:

代码语言:javascript复制
-----------------handle return---------------------
replyCode:312
replyText:NO_ROUTE
exchange:
routingKey:return.save
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:producer message return listener

如上,我们就可以看到发送失败后,监听到了对应的消息,这个时候就是另外一层逻辑的判断,比如我们可以写一个判断,如果判断返回的状态码是312,然后重新创建Exchange,然后再次发送,也就是“重试”,这个在稳定性体系中是非常重要的一个环节。

0 人点赞