在RabbitMQ生产者Confirm消息中介绍了RabbitMQ生产者端的消息确认的机制,也就是在生产者端把消息发送成功后进行消息的应答机制,但是如果生产者端发送的消息根本没有发送成功了?那么针对这种情况也是需要一种对应的解决方案来进行处理。针对这种特殊的情况RabbitMQ提供了Return消息保障的机制。
一、什么是Return消息保障
当消息生产者端把消息发送到Exchange和RoutingKey的时候,然后Exchange与Queue会形成一定的映射机制来消息发送的消息,这是一个正常的MQ生产消费的机制。但是在某些特殊的情况下,生产者发送的消息到Exchnage,但是Exchange不存在,还有一种情况是RoutingKey路由不到,导致生产者发送的消息无法让消费者来进行消费,针对这种情况生产者需要监听不可达的消息机制,也就是ReturnListener。在RabbitMQ消息队列服务器中,专门有一个配置来解决这种情况,具体配置的参数就是mandatory,它是一个boolean,如果配置的是true,那么意味着监听器会接收到路由不可达的消息,然后进行后续逻辑上的处理,但是如果配置参数是false,那么MQ的Broker就会自动删除该MQ的message。
二、Return消息机制流程
通过如下交互图来呈现Return消息保障的机制,具体如下:
三、Return代码实现
3.1、生产者代码
代码语言:javascript复制package com.example.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ProducerReturn
{
private static final String exchangeName="";
private static final String routyKey="return.save";
public static void main(String[] args) throws Exception
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "producer message return listener";
channel.basicPublish(exchangeName, routyKey, null, msg.getBytes());
/*
* 添加发送消息前进行确认的机制,ReturnListener里面会提供一个接口的机制来进行保障
* */
channel.addReturnListener(new ReturnListener()
{
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("-----------------handle return---------------------");
System.out.println("replyCode:" replyCode);
System.out.println("replyText:" replyText);
System.out.println("exchange:" exchange);
System.out.println("routingKey:" routingKey);
System.out.println("properties:" properties);
System.out.println("body:" new String(body));
}
});
//使用mandtory的方式来发送消息,该参数设置为true,如果接收失败,那么就会输出接口监听到的信息
channel.basicPublish(exchangeName,routyKey,true,null,msg.getBytes());
}
}
备注:在如上监听的接口的方法中,replyCode值的是返回的状态码,replyText值的是返回的文本信息。这地方故意写的是Exchange为空。同时在basicPublish中,参数mandatory参数设置的是true。
3.2、消费者代码
代码语言:javascript复制package com.example.rabbitmq;
import com.rabbitmq.client.*;
public class ConsumerReturn
{
private static final String EXCHANGE = "test_return_exchange";
private static final String queueName="test_return_queue";
private static final String routingKey="return.#";
public static void main(String[] args) throws Exception
{
try{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE,routingKey);
// 监听队列,从队列中获取数据
channel.basicConsume(queueName,new MyConsumer(channel=channel));
}catch (Exception e){
e.printStackTrace();
}
}
}
备注:如上是消费者的代码,基本没多少区别的,只是把消费者接收消息的部分单独的分离了出来,分离出来的代码具体如下所示:
代码语言:javascript复制package com.example.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer
{
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel)
{
super(channel);
this.channel=channel;
}
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.err.println("consumerTag:" consumerTag);
System.err.println("envelope:" envelope);
System.err.println("properties:" properties);
System.err.println("the message received:" new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
3.3、执行结果
如上代码执行后,生产者端就会返回消息,也就是说在生产者发送消息的时候,找不到Exchange,这个时候消息肯定是无法发送的,我们开启了生产者端的监听,监听返回的信息如下:
代码语言:javascript复制-----------------handle return---------------------
replyCode:312
replyText:NO_ROUTE
exchange:
routingKey:return.save
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:producer message return listener
如上,我们就可以看到发送失败后,监听到了对应的消息,这个时候就是另外一层逻辑的判断,比如我们可以写一个判断,如果判断返回的状态码是312,然后重新创建Exchange,然后再次发送,也就是“重试”,这个在稳定性体系中是非常重要的一个环节。