前提
本文内容参考RabbitMQ官方文档Direct reply-to。
直接回复
直接回复(Direct reply-to)是一种可以避免声明回复队列并且实现类似于RPC功能的一种特性。RabbitMQ中允许使用客户端和RabbitMQ消息代理中间件实现RPC模式,典型的做法是:RPC客户端发送请求(消息)到一个持久化的已知服务端队列,RPC服务端消费该服务端队列的消息,然后使用消息属性中的reply-to
属性对应的值作为客户端回复队列发送回复消息到RPC客户端。
客户端回复队列需要考虑创建问题。客户端可以为每个请求-响应声明一个一次性的队列,但是这样的做法是十分低效的,因为即使是非持久状态下的非镜像队列,其删除的代价是昂贵的,特别是在集群模式之下。另一个可选的做法是:客户端为回复创建一个持久化的长期存在的队列,这种情况下队列的管理可能变得复杂,因为客户端本身可能不是长期存在的。
实际上,RabbitMQ提供了一个功能,允许RPC客户端直接从其RPC服务端接收回复,并且无需创建回复队列,依赖于RabbitMQ的消息中间件的功能,具体做法是:
对于RPC客户端:
- RPC客户端创建消费者的时候队列指定为伪队列
amq.rabbitmq.reply-to
,使用非手动ack模式(autoAck=true)进行消费,伪队列amq.rabbitmq.reply-to
不需要显式声明,当然如果需要的话也可以显式声明。 - 发布消息的时候,消息属性中的
reply-to
属性需要指定为amq.rabbitmq.reply-to
。
对于RPC服务端:
- RPC服务端接收消息后感知消息属性中的
reply-to
属性存在,它应该通过默认的交换器(名称为"")和reply-to
属性作为路由键发送回复消息,那么该回复消息就会直接投递到RPC客户端的消费者中。 - 如果RPC服务端需要进行一些长时间的计算逻辑,可能需要探测RPC服务端是否存活,可以使用一个一次性使用的信道对
reply-to
属性做一次队列声明,如果声明成功,队列amq.rabbitmq.reply-to
并不会创建,如果声明失败,那么说明客户端已经失去连接。
注意事项:
- RPC客户端在创建伪队列
amq.rabbitmq.reply-to
消费者的时候必须使用非手动ack模式(autoAck=true)。 - 使用此机制发送的回复消息通常不具有容错能力,如果发布原始请求的客户端随后断开连接,它们将被丢弃。
- 伪队列
amq.rabbitmq.reply-to
可以在basic.consume
、basic.publish
和消息属性reply-to
中使用,实际上,它并不是一个真实存在的队列,RabbitMQ的Web管理器或者rabbitmqctl list_queues
命令都无法展示该伪队列的相关属性或者信息。
说实话,个人认为这种方式有个比较多的局限性:
- 同一个应用里面,只能使用唯一一个伪队列
amq.rabbitmq.reply-to
消费回复消息,并且RabbitMQ的Web管理器或者rabbitmqctl list_queues
命令都无法展示该伪队列的相关属性或者信息,也就是无法对它进行监控或者管理。 - 对于多应用同时接进去同一个RabbitMQ消息中间件代理,这些应用之间无法同时使用
amq.rabbitmq.reply-to
这个特性,因为有可能A客户端发送的消息被远程服务回调到另一个不同的B客户端。
直接回复特性使用
使用伪队列amq.rabbitmq.reply-to
的一个例子:
public class ReplyToRawMain extends BaseChannelFactory {
private static final String FAKE_QUEUE = "amq.rabbitmq.reply-to";
private static final String RPC_QUEUE = "rpc.queue";
private static final String DEFAULT_EXCHANGE = "";
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 服务端队列
channel.queueDeclare(RPC_QUEUE, true, false, false, null);
client(channel);
server(channel);
Thread.sleep(5000);
});
}
private static void client(Channel channel) throws Exception {
// 客户端消费 - no-ack,也就是autoAck = true
channel.basicConsume(FAKE_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Client]ndeliveryTag:%snexchange:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
}
});
// 客户端发送
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId("message-99999")
.replyTo(FAKE_QUEUE)
.build();
channel.basicPublish(DEFAULT_EXCHANGE, RPC_QUEUE, basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));
}
private static void server(Channel channel) throws Exception {
// 服务端消费
channel.basicConsume(RPC_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Server]ndeliveryTag:%snexchange:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
// 服务端应答->客户端
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish(DEFAULT_EXCHANGE, properties.getReplyTo(), basicProperties, body);
}
});
}
}
当然,可以直接创建一个真实的独占队列(生命周期跟客户端的连接绑定)作为回复队列,举个例子:
代码语言:javascript复制public class ReplyToMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 服务端队列
channel.queueDeclare("rpc.queue", true, false, false, null);
// 客户端接收应答队列 - 排他队列,生命周期和连接绑定
AMQP.Queue.DeclareOk callback = channel.queueDeclare("", false, true, false, null);
System.out.println("建立排他应答队列:" callback.getQueue());
// 客户端消费
channel.basicConsume(callback.getQueue(), false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Client]ndeliveryTag:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
}
});
// 服务端消费
channel.basicConsume("rpc.queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Server]ndeliveryTag:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
// 服务端应答
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", properties.getReplyTo(), basicProperties, body);
}
});
// 客户端发送
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId("message-99999")
.replyTo(callback.getQueue())
.build();
channel.basicPublish("", "rpc.queue", basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));
Thread.sleep(5000);
});
}
}
个人想法
在实际项目中,我们经常被RabbitMQ消息发送是否成功这个问题困扰,一般情况下,我们认为调用basic.publish
只要不抛出异常就是发送消息成功,例如一个代码模板如下:
public boolean sendMessage(){
boolean success = false;
try {
channel.basicPublish();
// 发送成功
success = true;
}catch (Exception e){
// 发送失败
log.error();
}
return success;
}
这个代码模板在极大多数情况下是合适的,但是有些时候我们确实需要消息的接收方告知发送方已经收到消息,这个时候就需要用到消息的回复功能,个人认为可选的方案有:
- 消息发布方基于伪队列
amq.rabbitmq.reply
进行消费,消息接收方回复到伪队列amq.rabbitmq.reply
上。 - 消息发布方自定义独占队列进行消费,消息接收方回复到此独占队列。
- 消息发布方自定义持久化队列进行消费,消息接收方回复到此持久化队列。
其实,在**AMQP.BasicProperties
**的replyTo属性中指定需要回复的队列名只是RabbitMQ提出的一种规约或者建议,并不是强制实行的方案,实际上可以自行选择回复队列或者忽略replyTo属性。
本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650026