MQ消息堆积终极解决方案【RabbitMQ】

2020-06-16 16:01:55 浏览数 (3)

如果架构中有用到mq,那就不可避免会遇到消息堆积的问题,因为我们没办法保证自己生产和消费永远都是正确的。像我们系统就遇到过很多次消息堆积情况,最严重的一次直接导致mq内存溢出,服务宕机,导致所有的mq消费全部出现异常,下面我就这个问题和童靴们唠叨唠叨。

消息推送校验模式:

遇到这个问题,第一个想法就是在推送消息的地方做改动,比如要推送mq的时候,先检查一下mq对应的队列是否达到上限,如果达到就不推送。

但是如果消息具有时效性,也就是最新推送的消息和mq中已经推送的消息,是不一样的,这个时候就不能这样处理,而且如果推送的时候正好mq不稳定,导致获取队列消息失败,就可能导致错误的操作,所以这种方案可用性太低。

监听器消费模式:

后面甚至还想通过监听器来消费掉这些堆积的消息(该监听器只用来ack掉消息,不做任何业务处理),但是这样不仅影响服务器的性能还影响网络带宽,所以这种方式也是不可取的。

脚本后台清理模式:

最终确定下来的方案是通过脚本来删除,因为RabbitMq支持命令查询、修改、清空队列,基于这种方式,我们可以写一个脚本,定期获取需要监控的队列数据情况,如果达到上限,就通过命令直接删除,这种方式不仅可靠,而且对应用服务器没有任何侵入性,可以很方便的实现。

脚本:

代码语言:javascript复制
#!/bin/bash
##################################################
# vim /etc/crontab
# */30 * * * * root sh /mnt/rabbitmqMonitor/rabbitmq_monitor.cron
##################################################
#rabbitmq的环境变量
export RABBITMQPATH=/usr/lib/rabbitmq/bin
# 定义需要请求的队列名称数组
array_queue_name[0]="amz_RealTimeOrder:input"
array_queue_name[1]="amz_advertisement:request"
array_queue_name[2]="amz_advertisement:report"
array_queue_name[3]="mws:report:request_input"
array_queue_name[4]="mws:report:download_input"
array_queue_name[5]="amz_advertisement:info"
# 定义需要队列所对应的最大值
declare -A queueMsgMaxMap
queueMsgMaxMap["amz_RealTimeOrder:input"]=1
queueMsgMaxMap["amz_advertisement:request"]=10000000000
queueMsgMaxMap["amz_advertisement:report"]=10000000000
queueMsgMaxMap["mws:report:request_input"]=100000000000
queueMsgMaxMap["mws:report:download_input"]=100000000000
queueMsgMaxMap["amz_advertisement:info"]=1000000000000
#获取所有队列的名字和每个队列中的消息数量,存入'queueNum'数组中
queueIndex=0
for QUEUE in $(rabbitmqctl list_queues |grep -v 'Listing queues ...' | awk -F' ' '{print $1}');
do
    for queue_name in ${array_queue_name[*]}
    do
    	if [[ $QUEUE = $queue_name ]]; then
           #统计每个消息队列的数量
    	   nums=$(rabbitmqctl list_queues |grep -w $QUEUE | awk -F' ' '{print $2}')
    	   echo -e "startPush------------$nums-----------$QUEUE---------set>>>>${queueMsgMaxMap[$QUEUE]}--------"
    	   # -ge  
    	   if [[ $nums -ge ${queueMsgMaxMap[$QUEUE]} ]]; then
              #存key
              queueName[$queueIndex]=$QUEUE
              queueIndex=`expr $queueIndex   1`
	      echo -e "maxPush------------$num-----------$QUEUE----------------"
    	   fi
    	fi
    done
done 
#如果有异常,发送邮件
exceptionNum=${#queueName[@]}
if [[ $exceptionNum -gt 0 ]]; then
    #有队列阻塞,exceptionName存放的为堵塞队列的名称,清空队列
    for name in ${queueName[*]}
    do
       $(rabbitmqctl -p /  purge_queue $name)
       echo -e "purge queue name>>>>>>>>>>>>>>$name"
    done
    echo "###################count at $(date  '%d-%m-%Y %H:%M:%S') ######################"
fi

注意事项:

消息堆积的时候除了要及时清理堆积消息,还要进行必要报警,像我们系统就是通过企业微信报警群来报警的,一旦消息堆积,开发人员就可以马上收到相关报警信息,并及时的进行处理。 还要非常重要的一点是,消息必须是无状态的才可以清空,不然一旦删除将会导致数据丢失。我们在设计mq的时候,也要秉持着这种原则,因为消息并不一定100%可靠,要做好消息丢失的措施。

0 人点赞