前提
本文来源于官方文档Consumer Cancel Notification。
消费者取消通知
当一个信道上建立的消费者订阅了一个队列,有可能出现各种原因导致消费停止。一个很明显的原因就是客户端在同一个信道上发出basic.cancel
命令,消息中间件代理响应basic.cancel-ok
,将会导致消费者被取消。还有其他的事件如队列的删除或者集群方案所在队列的集群节点失败也有可能导致消费者被取消,消费者被取消这个事件并不会通知客户端对应的信道,这样子会造成客户端无法感知消费者被取消。
为了避免上面这些情况出现,RabbitMQ引入了扩展特性:由于消息中间件代理出现的异常或者正常情况导致消费者取消,会向对应的消费者(信道)发送basic.cancel
,但是由客户端信道主动向消息中间件代理发送basic.cancel
以取消消费者的情况下不会受到消息中间件代理的basic.cancel
回复。
有些情况下,客户端感知到异常(例如队列删除等)主动向消息中间件代理发送basic.cancel
,这个时候,消息中间件代理也有可能因为队列删除主动向对应的消费者(信道)发送basic.cancel
,也就是存在竞争,RabbitMQ代理收到前者的basic.cancel
时不会出现异常,基于后者还是正常回复basic.cancel-ok
。
举个例子,情况一:例如我们主动取消信道上的消费者:
代码语言:javascript复制public class InitiativeBasicCancel extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 此方法返回的是消费者的标签
String consumerTag = channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {
});
channel.basicCancel(consumerTag);
});
}
}
情况二:假如我们想监听消息中间件代理异步回调的basic.cancel
和basic.cancel-ok
,应该这样做:
public class AsyncBasicCancel extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("收到来自消息中间件代理的basic.cancel-ok回复,consumerTag=" consumerTag);
}
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("收到来自消息中间件代理的basic.cancel回复,consumerTag=" consumerTag);
}
});
});
}
}
一般情况下,我们应该同时考虑情况一和情况二有可能同时发生(也就是前面说到的竞争),并且做好相应的处理即可。