小编说:mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。 对于初学者来说,特别容易将mandatory和immediate这两个参数混淆,而对于备份交换器更是一筹莫展,本文对此一一展开探讨。 本文选自《RabbitMQ实战指南》
- mandatory参数
当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。
那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。
使用mandatory参数的关键代码如代码清单1所示。
代码语言:javascript复制channel.basicPublish(EXCHANGE_NAME, "", true,MessageProperties.PERSISTENT_TEXT_PLAIN,"mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText,String exchange, String routingKey,AMQP.BasicProperties basicProperties,byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return返回的结果是:" message);
}
});
上面代码中生产者没有成功地将消息路由到队列,此时RabbitMQ会通过Basic.Return返回“mandatory test”这条消息,之后生产者客户端通过ReturnListener监听到了这个事件,上面代码的最后输出应该是“Basic.Return返回的结果是:mandatory test”。
从AMQP协议层面来说,其对应的流转过程如图1所示。
图1 mandatory参数
- immediate参数
当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
概括来说,mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0版本开始去掉了对immediate参数的支持,对此RabbitMQ官方解释是:immediate参数会影响镜像队列的性能,增加了代码复杂性,建议采用TTL和DLX的方法替代。
发送带immediate参数(immediate参数设置为true)的Basic.Publish客户端会报如下异常:
代码语言:javascript复制[WARN] - [An unexpected connection driver error occured (Exception message: Connection reset)] - [com.rabbitmq.client.impl.ForgivingExceptionHandler:120]
RabbitMQ服务端会报如下异常(查看RabbitMQ的运行日志,默认日志路径为RABBITMQ_HOME/var/log/rabbitmq/rabbit@HOSTNAME.log):
代码语言:javascript复制=ERROR REPORT==== 25-May-2017::15:10:25 ===
Error on AMQP connection <0.25319.2> (192.168.0.2:55254->192.168.0.3:5672, vhost: '/', user: 'root', state: running), channel 1:{amqp_error,not_implemented,"
immediate=true",'basic.publish'}
- 备份交换器
备份交换器,英文名称为Alternate Exchange,简称AE,或者更直白地称之为“备胎交换器”。生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也可以通过策略(Policy)的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉Policy的设置。
使用参数设置的关键代码如代码清单2所示。
代码清单2
代码语言:javascript复制Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "myAe");
channel.exchangeDeclare("normalExchange", "direct", true, false, args);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueDeclare("unroutedQueue", true, false, false, null);
channel.queueBind("unroutedQueue", "myAe", "");
上面的代码中声明了两个交换器normalExchange和myAe,分别绑定了normalQueue和unroutedQueue这两个队列,同时将myAe设置为normalExchange的备份交换器。注意myAe的交换器类型为fanout。
参考图2,如果此时发送一条消息到normalExchange上,当路由键等于“normalKey”的时候,消息能正确路由到normalQueue这个队列中。如果路由键设为其他值,比如“errorKey”,即消息不能被正确地路由到与normalExchange绑定的任何队列上,此时就会发送给myAe,进而发送到unroutedQueue这个队列。
图2 备份交换器
同样,如果采用Policy的方式来设置备份交换器,可以参考如下:
代码语言:javascript复制rabbitmqctl set_policy AE "^normalExchange$" ‘{"alternate-exchange": "myAE"}’
备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,如若读者想设置为direct或者topic的类型也没有什么不妥。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。
考虑这样一种情况,如果备份交换器的类型是direct,并且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为key1,则可以存储到队列中。
对于备份交换器,总结了以下几种特殊情况:
- 如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。