前提
本文来源于官方文档Consumer Priorities。
消费者优先级
消费者优先级的机制:
- 高优先级的消费者处于活跃状态的情况下优先接收和处理消息。
- 消息会流入到低优先级的活跃消费者仅当高优先级的消费者处于阻塞状态。
正常情况下,所有订阅同一个队列的活跃消费者以循环的(round-robin)方式从队列中接收消息。当使用了消费者优先级,如果多个活跃消费者使用了相同的高优先级属性,那么消息投递也是以循环的方式进行(其实使用了相同的优先级类似于没有启用优先级)。
活跃消费者的定义
活跃的消费者就是可以在不用等待的情况下接收和处理消息的消费者,也就是消费者如果无法接收消息,那么它就是出于非活跃状态(或者说阻塞状态),阻塞的常见原因有:
- 使用了
basic.qos
之后,消费者在信道中未确认的预读取消息达到了上限。 - 网络阻塞。
因此,对于每个存在的队列,必定至少出现下面三种情况的其中一种:
- 队列没有活跃的消费者。
- 队列是空的。
- 队列正在忙于向消费者投递消息。
消费者可能在一秒内多次在活跃和阻塞状态之间切换,只要消费处理速度足够快。RabbitMQ不会通过Web管理插件或者**rabbitmqctl
**命令公开消费者当前是活跃还是阻塞状态,换言之,只能通过客户端感知。
启用消费者优先级的时候,RabbitMQ会优先投递消息到优先级属性比较高的消费者,但是如果所有优先级高的消费者都处于阻塞状态,RabbitMQ会把消息投递到活跃的优先级稍低的消费者,而不是一直等待优先级高的消费者解除阻塞,造成优先级低的消费者一直处于饥饿状态。
使用消费者优先级特性
在使用**basic.consume
**方法可以设置参数**x-priority
**的值为整数,数字越大则优先级越高,未设置则使用默认值0。
public class ConsumerPriorityMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
Map<String, Object> consumerArgs = new HashMap<>(8);
consumerArgs.put("x-priority", 10);
channel.basicConsume("throwable.queue.direct", true, consumerArgs, new DefaultConsumer(channel) {
});
consumerArgs.put("x-priority", 100);
channel.basicConsume("throwable.queue.direct", true, consumerArgs, new DefaultConsumer(channel) {
});
});
}
}
上面的例子设置了两个消费者,后者的优先级为100,而前者的优先级为10。
本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650081