pulsar-8:40个分区的topic消息严重不均衡下个别partition无法被consumer消费

2022-04-06 11:20:18 浏览数 (2)

目录:

(1).pulsar版本&细节&架构

(2).问题&现象与使用陈述&脱敏代码

1.问题与现象

2.排查过程 (3).最终原因与解决

(1).pulsar版本&细节&架构

pulsar版本是2.8.0,部署在openjdk11上,具体版本号是:11.0.12。 在aws海外部署,使用机型是c5a.2xlarge(8c16g),一共是3台,每台部署一个broker、bookie、zk。启动命令的参数没有修改都是默认值。

部署详情与细节:

pulsar-7:aws上部署生产级别的5节点pulsar集群

(2).问题&现象与使用陈述&脱敏代码

1.问题与现象

40个分区的topic消息严重不均衡下个别partition无法被consumer消费。最近一次是有两个分区各自堆积到30万左右(backlog值)。

这个分区topic消息发送平均大小:

pulsar_rate_in:

pulsar_rate_out:spike的时间是我们发现不消费的时间,但有可能之前就已经有问题了,这么高是重启了consumer。

2.脱敏代码

代码语言:javascript复制
package test;


import com.google.common.collect.Lists;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


import org.apache.pulsar.client.api.Producer;


@Service
public class PushService implements IPushService {


        private final static Logger logger = LoggerFactory.getLogger(PushService.class);


        @Resource
        private IMqBackoffService mqBackoffService;


        @Resource(name = "XXXStatusProducer")
        private ProducerYYYStatusUUUProducer;


        @Override
        public void pushZZZPPPRRRStatus(Long userId, ListRRRList) {
                String content = GsonUtil.beanToJsonString(RRRList);
                try {
                        //使用的是异步,且批量的发送方式,按照key做sharding发送到topic不同的partition.
                        YYYStatusUUUProducer.newMessage().key(String.valueOf(userId))
                                        .value(content.getBytes(StandardCharsets.UTF_8)).sendAsync().exceptionally((e -> {
                                                logger.error("send sync ZZZ PPP RRR status error,content:{}", content, e);
                                                // 如果发送失败,会将异步发送失败的消息存到aws的aurora数据库,使用数据库的本地事务(shardingjdbc4.1.1),事务使用的注解方式.
                                                mqBackoffService.saveYYYStatus(RRRList);
                                                return null;
                                        }));


                } catch (Exception e) {
                        logger.error("send ZZZ PPP RRR status error,content:{}", content, e);
                        mqBackoffService.saveYYYStatus(RRRList);
                }
        }


}
代码语言:javascript复制
consumer使用的是key-sharding方式消费,脱敏代码:
代码语言:javascript复制
package test;


import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;


import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;


@Component
public class XXXResultMessageListener implements MessageListener{


        private static final Logger log = LoggerFactory.getLogger(XXXResultMessageListener.class);


        private final IoooService oooService;


        public XXXResultMessageListener(IoooService oooService) {
                this.oooService = oooService;
        }


        @SneakyThrows
        @LogTid
        @Override
        public void received(Consumerconsumer, Messagemsg) {
                String body = new String(msg.getValue());
                try {
                        if (StrUtil.isBlank(body)) {
                                log.warn("YYY result received msg value is null messageId:{}", msg.getMessageId());
                                consumer.acknowledge(msg.getMessageId());
                                return;
                        }
                        log.info("YYY result,topicName:{},message:{}", msg.getTopicName(), body);
                        ListTTTUUUStatusRespList = JSONUtil.toList(body, TTTUUUStatusResp.class);
                        if (CollUtil.isNotEmpty(TTTUUUStatusRespList)) {
                                TTTUUUStatusResp TTTUUUStatusResp = TTTUUUStatusRespList.get(0);
                                if (UserTypeEnum.PPP.getCode().equals(TTTUUUStatusResp.getUserType())) {
                                        // 只是设置标志,是否打印日志.
                                        ThreadFilter.setPrintFlag(false);
                                }
                                // 同步处理,等处理完成后再获取消息,涉及到的aurora数据库的本地事务,使用的是shardingjdbc4.1.1,事务都是用的注解方式.
                                oooService.vvvBatchReceived(TTTUUUStatusRespList);
                        }
                        consumer.acknowledge(msg.getMessageId());
                } catch (Exception e) {
                        if (e instanceof BaseRuntimeException) {
                                // BaseRuntimeException是业务异常,业务规定不用处理,直接返回ack.
                                consumer.acknowledge(msg.getMessageId());
                        } else {
                                log.error("YYY result Failed to process message error,data is {},e:{}", body, e.getStackTrace(), e);
                                // 如果业务处理失败,告知pulsar重新消费.
                                consumer.negativeAcknowledge(msg);
                        }
                        throw e;
                } finally {
                        ThreadFilter.removePrintFlag();
                }
        }
}

3.问题排查

producer使用的是批量发送(平均8条左右是一批,最多40条是一批),并且是异步发送,使用key-sharding的方式发送到这个topic的不同分区。

这个40个分区的topic,现在是有2个分区啊严重堆积,各堆积了30万不消费。但是重启broker后会消费,但是当消费万1~2万消息后又不消费了,再次重启又消费一点然后停止,反复如此。而且我们一直给的压力都很大,压极限,之前也经常出现处理能力不足造成的几十万消息堆积,但是处理能力上来后都可以平滑的消费万,但是11.12号突然出现消费不了的情况。

在问题分区上查看了pulsar-broker的所有线程,没有死锁。

经过不断查、重启,现在连个分区还各余不到10万堆积。

arthas看堆积的broker节点,也没有啥异常:

我查阅了2.8.1release的fix list,注意到了2.8.1的这个修复: [broker] Fix issue where Key_Shared consumers could get stuck #10920 #10920

然后我同样看了下问题分区的stats,和上述issue中的stats做对比,不是同一个现象,但确实有些问题: a1.consumers竟然是空的,也就是没有消费者,那肯定没有办法消费了吧?在issue#10920中consumers是有消费者的。 a2.subscriptions下的msgBacklog和msgBacklogNoDelayed有值且相等且正确,但是backlogSize确实0,这个貌似不对。

顺着思路我查看了这个topic其他正常的分区,正常分区的stats是: 1.consumers有消费者实例,正确。 2.subscriptions下的msgBacklog和msgBacklogNoDelayed,backlogSize有值且相等且正确,都是0。

也就是说,在这种场景下,somehow的未知原因会导致严重堆积/不均衡的分区会丢失自己的consumer么。

接着查,发现是因为消费者的jvm假死了,消费者是两台机器(8c16g),都假死了,但是奇怪的是第一台机器的所关联的分区还在(topics stats可以看到),第二台机器所关联的分区不在了(topics stats看到的consuemrs为空)。

假死原因是因为垃圾回收用的是G1,之前用ZGC的时候,即使积压了几十万,从来没死过,过会就缓过来了。

现在我把consumer重启(垃圾回收还是G1),问题分区的topic stats:然后跑一跑又停了(消费者hang住了),但是topic stats中的consuemrs还在。

果然如所猜测的一致,当消费者由于GC假死后,分区的topic stats中的consumers为空,pulsar-broker应该是通过心跳判断consuemr已经断开了。

目前从重现&各种现象下的分析结果是:消费者如果使用的是G1(已经做调优),并且在海量消息段时间涌入时,会出现假死,然后从pulsar-broker上断开。导致无法消费。

再切ZGC试试,发现ZGC也不行,经查是因为每批发送40个消息,之前用ZGC的时候是每批发送10个消息。

如下图,是consumer刚恢复时pulsar给consumer吐出的速率,但是一个消费者的TPS是600多(业务重),肯定是无法消费过来的,但还接受海量的消息,会不会把内存中的queue撑爆了?

我的问题是:为啥我的消费能力只有600多,但是pulsar却给我这么海量的消息,我并没有取这么多消息啊。

目前阶段定位是: pulsar-consumer默认使用的是push方式,大量积压后,消费者重启时,pulsar-broker会推送海量消息到consumer,直接把consuemr内存打爆。

(3).最终原因与解决

key_shard模式下,pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,receiverQueueSize只能限制这个queue的大小,但这个限制没用,只能让服务慢点死。因为每个key_shard都在client分配了一个singleThreadPool来处理,而这个singleThreadPool是一个无界队列,receiverQueue不断接收到消息后转发到key_shard的threadPool中的无界队列后,receiverQueue继续从pulsar-broker接收推送来的消息,直到把所有的key_shard的threadpool的无界队列打到最大极限,把服务吃死。

这个问题只有两种解决方式: 1.增加consumer的消费节点,和每个消费节点的消费能力。但是我们的业务场景是不可能的,因为处理速度是恒定的,我不可能无限加节点去解决这个问题。如果还要用这个方式,当出现这个问题后,只能临时加节点结合不断重启来解决,这样做也很奇葩。 2.改用同步pull的方式去消费,即当本地线程池处理完消息后再到receiverQueue中拿消息,这样就不会把本地线程池的queue打满。肯定没有问题。对于我们的业务来说,这个是正确选择。本质上还是对pulsar的使用不当造成的,用其他的消息队列用push的话也会这样大量堆积。

最后再次简述原因:

pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,结果本地处理这些消息的线程池/本地队列也是异步的去从receiverQueue中拿消息,且本地队列没有限制队列长度,然后直接打满本地队列。

我提交的相关issue地址:

https://github.com/apache/pulsar/issues/12800

0 人点赞