根据RabbitMQ官方文档描述,可以通过“预取数量”来限制未被确认的消息个数,本质上这也是一种对消费者进行流控的方法。 详见:https://www.rabbitmq.com/consumer-prefetch.html#independent-consumers 。
由RabbitMQ的机制可知,当多个消费者订阅同一个Queue时,这时Queue中的消息会被平均分摊给多个消费者进行处理,因此一定要对该参数设置合理的值。 需要针对具体的应用场景,适当增大或减小该参数值(默认值为0表示不限制),以提高消费者吞吐量和充分利用资源,参考策略如下:
1.针对订单类消息,因为处理耗时很短,可以适当增大该参数值,这样Broker在一次网络通信中会尽可能多地推送一些数据给消费者,以提高消费吞吐量; 2.对于依赖CPU计算型的耗时任务,该参数值则不能设置过大,否则会出现消息被分配后因为耗时等待一直无法确认而产生堆积,此时即使有别的消费者已经空闲也无法再被分配这些已经堆积的消息,导致资源浪费。
RabbitMQ客户端提供了相应设置方法:
代码语言:javascript复制// 设置预取消息数量,默认值为0,不限流
channel.basicQos(10);
在Spring Boot框架中可以直接通过如下配置参数进行设定:
代码语言:javascript复制// listener类型为direct,设置预取消息数量为10,默认值为250(在AbstractMessageListenerContainer中定义的常量:DEFAULT_PREFETCH_COUNT)
spring.rabbitmq.listener.direct.prefetch=10
落实到本项目中,线上曾出现过这样的现象:K8S管理的Docker集群中,当RabbitMQ中出现消息堆积时,却只有1个Docker实例的负载持续很高,而其他Docker实例都非常闲。这显然不符合预期,应该大家都很忙才对。 经排查分析后得知:本项目的特点是每一个任务消息都是CPU耗时型,如果消费者每次都获取到多个任务消息到本地,那么就会出现即使其他消费者已经空闲了也无法为自己分担任务的情形。
解决办法:限制每次给每个消费者只分派一个任务消息(prefetch=1),这样如果某个消费者在处理任务时被“卡住”了,则不再分配新的任务给它,而是把剩下的任务消息分配给那些已经空闲的消费者执行。