如果架构中有用到mq,那就不可避免会遇到消息堆积的问题,因为我们没办法保证自己生产和消费永远都是正确的。像我们系统就遇到过很多次消息堆积情况,最严重的一次直接导致mq内存溢出,服务宕机,导致所有的mq消费全部出现异常,下面我就这个问题和童靴们唠叨唠叨。
消息推送校验模式:
遇到这个问题,第一个想法就是在推送消息的地方做改动,比如要推送mq的时候,先检查一下mq对应的队列是否达到上限,如果达到就不推送。
但是如果消息具有时效性,也就是最新推送的消息和mq中已经推送的消息,是不一样的,这个时候就不能这样处理,而且如果推送的时候正好mq不稳定,导致获取队列消息失败,就可能导致错误的操作,所以这种方案可用性太低。
监听器消费模式:
后面甚至还想通过监听器来消费掉这些堆积的消息(该监听器只用来ack掉消息,不做任何业务处理),但是这样不仅影响服务器的性能还影响网络带宽,所以这种方式也是不可取的。
脚本后台清理模式:
最终确定下来的方案是通过脚本来删除,因为RabbitMq支持命令查询、修改、清空队列,基于这种方式,我们可以写一个脚本,定期获取需要监控的队列数据情况,如果达到上限,就通过命令直接删除,这种方式不仅可靠,而且对应用服务器没有任何侵入性,可以很方便的实现。
脚本:
代码语言:javascript复制#!/bin/bash
##################################################
# vim /etc/crontab
# */30 * * * * root sh /mnt/rabbitmqMonitor/rabbitmq_monitor.cron
##################################################
#rabbitmq的环境变量
export RABBITMQPATH=/usr/lib/rabbitmq/bin
# 定义需要请求的队列名称数组
array_queue_name[0]="amz_RealTimeOrder:input"
array_queue_name[1]="amz_advertisement:request"
array_queue_name[2]="amz_advertisement:report"
array_queue_name[3]="mws:report:request_input"
array_queue_name[4]="mws:report:download_input"
array_queue_name[5]="amz_advertisement:info"
# 定义需要队列所对应的最大值
declare -A queueMsgMaxMap
queueMsgMaxMap["amz_RealTimeOrder:input"]=1
queueMsgMaxMap["amz_advertisement:request"]=10000000000
queueMsgMaxMap["amz_advertisement:report"]=10000000000
queueMsgMaxMap["mws:report:request_input"]=100000000000
queueMsgMaxMap["mws:report:download_input"]=100000000000
queueMsgMaxMap["amz_advertisement:info"]=1000000000000
#获取所有队列的名字和每个队列中的消息数量,存入'queueNum'数组中
queueIndex=0
for QUEUE in $(rabbitmqctl list_queues |grep -v 'Listing queues ...' | awk -F' ' '{print $1}');
do
for queue_name in ${array_queue_name[*]}
do
if [[ $QUEUE = $queue_name ]]; then
#统计每个消息队列的数量
nums=$(rabbitmqctl list_queues |grep -w $QUEUE | awk -F' ' '{print $2}')
echo -e "startPush------------$nums-----------$QUEUE---------set>>>>${queueMsgMaxMap[$QUEUE]}--------"
# -ge
if [[ $nums -ge ${queueMsgMaxMap[$QUEUE]} ]]; then
#存key
queueName[$queueIndex]=$QUEUE
queueIndex=`expr $queueIndex 1`
echo -e "maxPush------------$num-----------$QUEUE----------------"
fi
fi
done
done
#如果有异常,发送邮件
exceptionNum=${#queueName[@]}
if [[ $exceptionNum -gt 0 ]]; then
#有队列阻塞,exceptionName存放的为堵塞队列的名称,清空队列
for name in ${queueName[*]}
do
$(rabbitmqctl -p / purge_queue $name)
echo -e "purge queue name>>>>>>>>>>>>>>$name"
done
echo "###################count at $(date '%d-%m-%Y %H:%M:%S') ######################"
fi
注意事项:
消息堆积的时候除了要及时清理堆积消息,还要进行必要报警,像我们系统就是通过企业微信报警群来报警的,一旦消息堆积,开发人员就可以马上收到相关报警信息,并及时的进行处理。 还要非常重要的一点是,消息必须是无状态的才可以清空,不然一旦删除将会导致数据丢失。我们在设计mq的时候,也要秉持着这种原则,因为消息并不一定100%可靠,要做好消息丢失的措施。