消息队列的消息大量积压怎么办?

2022-11-30 15:32:24 浏览数 (2)

1 问题追溯

系统出现性能问题,来不及处理上游发的消息,导致消息积压。消息积压是正常现象,但积压太多就需要处理了。就像水库,日常蓄水是正常的,但下游泄洪能力太差,导致水库水位一直不停上涨,就不正常!

2 开发的梦魇

  • 日常开发使用MQ时,如何避免消息积压?
  • 若线上已出现积压了,如何应急?

3 性能优化

性能优化主要在生产者和消费者这俩业务逻辑。

MQ自身性能,作为API使用者,无需过于关注。因大多MQ业务,MQ本身处理能力>>业务系统。主流MQ的单个节点,消息收发性能可达几w ~ 几十w条消息/s,还可水平扩展Broker实例数倍增处理能力。 而一般业务系统需处理的业务逻辑远比MQ复杂,单节点每秒可处理几百~几千次请求,已算性能佳。

所以,MQ性能优化,更关注在消息收发两端,业务代码怎么和MQ协作达到最佳性能。

3.1 生产端

此端的业务代码处理性能,和MQ关系不大,都是先执行业务逻辑,最后再发消息。 若你的代码发送消息的性能上不去,优先检查是否为发消息前的业务逻辑耗时太多。

对于发消息的业务逻辑,只需注意设置合适的并发和同步大小,即可达到很好发送性能。 Pro发消息给Broker,Broker收到消息后返回确认响应,是一次完整交互。 假设一次交互平均时延1ms,把这1ms分解:

  1. 发送端准备数据、序列化消息、构造请求等逻辑时间,即发送端在发送网络请求前的耗时
  2. 发送消息和返回响应在网络传输中耗时
  3. Broker处理消息的时延

若单线程发送,每次只发1条,则每秒只发

代码语言:javascript复制
1000ms / 1ms * 1条/ms = 1000条消息

这并不能压榨MQ性能。

无论是增加每次发送消息的批量大小、增加并发,都能倍增发送性能。 那到底选择批量发送 or 增加并发?取决于发送端的业务性质。能满足你的性能要求,就能搞。

  • 若发送端是个微服务,主要接受RPC请求处理在线业务 微服务在处理每次请求时,就在当前线程直接发消息,因为所有RPC框架都是多线程支持并发,自然可并行发送消息。 且在线业务比较在意请求响应时延,批量发送势必影响RPC服务时延。 这时通过并发提升发送性能就更好。
  • 若是离线分析系统,并不关心时延,而注重整个系统的吞吐量 发送端数据都来自DB,更适合批量发送,可批量从DB读数据,然后批量发送消息,用少量并发即可获得高吞吐量。

批量消费中,若某条消息消费失败,则重试会将整批消息重发。 批量消费是一次取一批消息,等这一批消息都成功,再提交最后一条消息的位置,作为新的消费位置。若其中任一条失败,则认为整批都失败。

3.2 消费端

使用MQ,大部分性能问题都出在消费端。若消费速度跟不上发送端生产消息速度,就会造成消息积压。若这种性能倒挂的问题是暂时的,问题不大,只要消费端性能恢复后,超过发送端的性能,积压的消息是可逐渐被消化的。若消费速度一直比生产速度慢,系统就会异常:

  • MQ存储被填满无法提供服务
  • 消息丢失

所以设计系统,要保证消费端消费性能>生产端发送性能

消费端性能优化除优化消费业务逻辑,也可水平扩容,增加消费端并发数提升总体消费性能。 扩容Con实例数量时,必须同步扩容主题中的分区(也叫队列)数量,确保Con实例数和分区数量相等。 若Con实例数量>分区数量,这样的扩容实际上徒劳。因为对Con,在每个分区实际上只能支持单线程消费。

这一步需要业务consumer团队联系消息中间件团队一起运维配合。

很多消费程序这样解决消费慢:

它收消息处理的业务逻辑可能较慢,也很难再优化,为避免积压,在收消息的OnMessage方法,不处理任何业务,把这消息放到一个本地消息表就返回。然后可通过定时任务,启动很多业务线程,里面是真正处理消息的业务逻辑,这些线程从本地消息表取消息处理,就解决了单Consumer不能并行消费问题。

消息积压,如何处理

还有种消息积压,日常系统正常运转时,没有积压或只有少量积压很快就消费了。但某刻,突然开始积压消息且积压持续上涨。 这时,得在短时间内找到消息积压原因,迅速解决问题才不至于继续影响业务。排查消息积压的原因,有成熟方案。积压突然增加,最粗粒度的原因:

  • 发送变快
  • 消费变慢

大部分MQ内置监控功能,通过监控数据,很容易确定哪种原因。若单位时间发送消息增多,如秒杀,短时内不大可能优化消费端代码以提升性能,只能通过扩容消费端实例数提升总体消费能力。 若短时内无足够服务器资源扩容,那就将系统降级,关闭一些不重要业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些核心业务。

kafka不仅是扩容时候,只要是consumer和partition有一方数量变化,都会触发rebalance。

还有种不太常见的,通过监控发现,无论是发送消息速度还是消费消息速度和原来都没啥变化,这时需检查消费端,是不是消费失败,导致同一条消息反复消费,这也会拖慢整个系统的消费速度。

若监控到消费变慢,需检查消费实例,分析消费变慢原因。优先检查日志是否有大量消费错误,若无错误,可打印堆栈信息,看消费线程是不是卡在哪里不动,如触发死锁或卡在等待某些资源

消费端是否可通过同步消费提升消费性能呢?

消费端进行批量操作,感觉和上面的先将消息放在内存队列,然后再并发消费消息类似,若机器宕机,这些批量消息都会丢失,若在DB层面,批量操作在大事务,会导致锁竞争,也会导致主备不一致。若是一些不重要消息,如备份日志,即可使用批量操作,提高消费性能,因为这样一些日志消息丢失也能接受。

若Con消费异常,即使多次消费也无法成功处理(如消息格式异常),导致一直无法ack这条消息,咋办? 有的MQ提供“死信队列”功能,会自动把这种反复消费都失败的消息丢到死信队列,避免一条消息卡主队列。

总结

消息积压处理: 1、发送端优化,增加批量和线程并发两种方式处理 2、消费端优化,优化业务逻辑代码、水平扩容增加并发并同步扩容分区数量

查看消息积压的方法: 1、消息队列内置监控,查看发送端发送消息与消费端消费消息的速度变化 2、查看日志是否有大量的消费错误 3、打印堆栈信息,查看消费线程卡点信息

1.无法提升消费业务效率(仅受消费业务自身逻辑影响),但可提高MQ中堆积消息消费的整体吞吐量(批推比单推mq耗时较短)。 2.数据增量同步,监控信息采集。(非核心业务的稳定大数据流操作)。 3.批处理意味数据积累和大数据传输,这会让单次消费的最长时延变长。同时批量操作为了保证当前批量操作一致性,在个别失败的情况下会引发批量操作重试。

0 人点赞