RabbitMQ扩展之直接回复(Direct reply-to)

2020-06-23 16:13:58 浏览数 (1)

前提

本文内容参考RabbitMQ官方文档Direct reply-to。

直接回复

直接回复(Direct reply-to)是一种可以避免声明回复队列并且实现类似于RPC功能的一种特性。RabbitMQ中允许使用客户端和RabbitMQ消息代理中间件实现RPC模式,典型的做法是:RPC客户端发送请求(消息)到一个持久化的已知服务端队列,RPC服务端消费该服务端队列的消息,然后使用消息属性中的reply-to属性对应的值作为客户端回复队列发送回复消息到RPC客户端。

客户端回复队列需要考虑创建问题。客户端可以为每个请求-响应声明一个一次性的队列,但是这样的做法是十分低效的,因为即使是非持久状态下的非镜像队列,其删除的代价是昂贵的,特别是在集群模式之下。另一个可选的做法是:客户端为回复创建一个持久化的长期存在的队列,这种情况下队列的管理可能变得复杂,因为客户端本身可能不是长期存在的。

实际上,RabbitMQ提供了一个功能,允许RPC客户端直接从其RPC服务端接收回复,并且无需创建回复队列,依赖于RabbitMQ的消息中间件的功能,具体做法是:

对于RPC客户端:

  • RPC客户端创建消费者的时候队列指定为伪队列amq.rabbitmq.reply-to,使用非手动ack模式(autoAck=true)进行消费,伪队列amq.rabbitmq.reply-to不需要显式声明,当然如果需要的话也可以显式声明。
  • 发布消息的时候,消息属性中的reply-to属性需要指定为amq.rabbitmq.reply-to

对于RPC服务端:

  • RPC服务端接收消息后感知消息属性中的reply-to属性存在,它应该通过默认的交换器(名称为"")和reply-to属性作为路由键发送回复消息,那么该回复消息就会直接投递到RPC客户端的消费者中。
  • 如果RPC服务端需要进行一些长时间的计算逻辑,可能需要探测RPC服务端是否存活,可以使用一个一次性使用的信道对reply-to属性做一次队列声明,如果声明成功,队列amq.rabbitmq.reply-to并不会创建,如果声明失败,那么说明客户端已经失去连接。

注意事项:

  • RPC客户端在创建伪队列amq.rabbitmq.reply-to消费者的时候必须使用非手动ack模式(autoAck=true)。
  • 使用此机制发送的回复消息通常不具有容错能力,如果发布原始请求的客户端随后断开连接,它们将被丢弃。
  • 伪队列amq.rabbitmq.reply-to可以在basic.consumebasic.publish和消息属性reply-to中使用,实际上,它并不是一个真实存在的队列,RabbitMQ的Web管理器或者rabbitmqctl list_queues命令都无法展示该伪队列的相关属性或者信息。

说实话,个人认为这种方式有个比较多的局限性:

  • 同一个应用里面,只能使用唯一一个伪队列amq.rabbitmq.reply-to消费回复消息,并且RabbitMQ的Web管理器或者rabbitmqctl list_queues命令都无法展示该伪队列的相关属性或者信息,也就是无法对它进行监控或者管理。
  • 对于多应用同时接进去同一个RabbitMQ消息中间件代理,这些应用之间无法同时使用amq.rabbitmq.reply-to这个特性,因为有可能A客户端发送的消息被远程服务回调到另一个不同的B客户端。

直接回复特性使用

使用伪队列amq.rabbitmq.reply-to的一个例子:

代码语言:javascript复制
public class ReplyToRawMain extends BaseChannelFactory {

	private static final String FAKE_QUEUE = "amq.rabbitmq.reply-to";
	private static final String RPC_QUEUE = "rpc.queue";
	private static final String DEFAULT_EXCHANGE = "";

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			// 服务端队列
			channel.queueDeclare(RPC_QUEUE, true, false, false, null);
			client(channel);
			server(channel);
			Thread.sleep(5000);
		});
	}

	private static void client(Channel channel) throws Exception {
		// 客户端消费 - no-ack,也就是autoAck = true
		channel.basicConsume(FAKE_QUEUE, true, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag,
									   Envelope envelope,
									   AMQP.BasicProperties properties,
									   byte[] body) throws IOException {
				System.out.println(String.format("[X-Client]ndeliveryTag:%snexchange:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
						envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
						properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
			}
		});
		// 客户端发送
		AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
				.correlationId("message-99999")
				.replyTo(FAKE_QUEUE)
				.build();
		channel.basicPublish(DEFAULT_EXCHANGE, RPC_QUEUE, basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));
	}

	private static void server(Channel channel) throws Exception {
		// 服务端消费
		channel.basicConsume(RPC_QUEUE, true, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag,
									   Envelope envelope,
									   AMQP.BasicProperties properties,
									   byte[] body) throws IOException {
				System.out.println(String.format("[X-Server]ndeliveryTag:%snexchange:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
						envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
						properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
				// 服务端应答->客户端
				AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
						.correlationId(properties.getCorrelationId())
						.build();
				channel.basicPublish(DEFAULT_EXCHANGE, properties.getReplyTo(), basicProperties, body);

			}
		});
	}
}

当然,可以直接创建一个真实的独占队列(生命周期跟客户端的连接绑定)作为回复队列,举个例子:

代码语言:javascript复制
public class ReplyToMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			// 服务端队列
			channel.queueDeclare("rpc.queue", true, false, false, null);

			// 客户端接收应答队列 - 排他队列,生命周期和连接绑定
			AMQP.Queue.DeclareOk callback = channel.queueDeclare("", false, true, false, null);

			System.out.println("建立排他应答队列:"   callback.getQueue());

			// 客户端消费
			channel.basicConsume(callback.getQueue(), false, new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,
										   Envelope envelope,
										   AMQP.BasicProperties properties,
										   byte[] body) throws IOException {
					System.out.println(String.format("[X-Client]ndeliveryTag:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
							envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
							properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
				}
			});

			// 服务端消费
			channel.basicConsume("rpc.queue", true, new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,
										   Envelope envelope,
										   AMQP.BasicProperties properties,
										   byte[] body) throws IOException {
					System.out.println(String.format("[X-Server]ndeliveryTag:%snroutingKey:%sncorrelationId:%snreplyTo:%sncontent:%sn",
							envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
							properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
					// 服务端应答
					AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
							.correlationId(properties.getCorrelationId())
							.build();
					channel.basicPublish("", properties.getReplyTo(), basicProperties, body);
				}
			});

            // 客户端发送
			AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
					.correlationId("message-99999")
					.replyTo(callback.getQueue())
					.build();
			channel.basicPublish("", "rpc.queue", basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));

			Thread.sleep(5000);
		});
	}
}

个人想法

在实际项目中,我们经常被RabbitMQ消息发送是否成功这个问题困扰,一般情况下,我们认为调用basic.publish只要不抛出异常就是发送消息成功,例如一个代码模板如下:

代码语言:javascript复制
public boolean sendMessage(){
    boolean success = false;
	try {
		channel.basicPublish();
		// 发送成功
		success = true;
	}catch (Exception e){
		// 发送失败
		log.error();
	}
	return success;
}

这个代码模板在极大多数情况下是合适的,但是有些时候我们确实需要消息的接收方告知发送方已经收到消息,这个时候就需要用到消息的回复功能,个人认为可选的方案有:

  • 消息发布方基于伪队列amq.rabbitmq.reply进行消费,消息接收方回复到伪队列amq.rabbitmq.reply上。
  • 消息发布方自定义独占队列进行消费,消息接收方回复到此独占队列。
  • 消息发布方自定义持久化队列进行消费,消息接收方回复到此持久化队列。

其实,在**AMQP.BasicProperties**的replyTo属性中指定需要回复的队列名只是RabbitMQ提出的一种规约或者建议,并不是强制实行的方案,实际上可以自行选择回复队列或者忽略replyTo属性

本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650026

0 人点赞