目录:
(1).pulsar版本&细节&架构
(2).问题&现象与使用陈述&脱敏代码
1.问题与现象
2.排查过程 (3).最终原因与解决
(1).pulsar版本&细节&架构
pulsar版本是2.8.0,部署在openjdk11上,具体版本号是:11.0.12。 在aws海外部署,使用机型是c5a.2xlarge(8c16g),一共是3台,每台部署一个broker、bookie、zk。启动命令的参数没有修改都是默认值。
部署详情与细节:
pulsar-7:aws上部署生产级别的5节点pulsar集群
(2).问题&现象与使用陈述&脱敏代码
1.问题与现象
40个分区的topic消息严重不均衡下个别partition无法被consumer消费。最近一次是有两个分区各自堆积到30万左右(backlog值)。
这个分区topic消息发送平均大小:
pulsar_rate_in:
pulsar_rate_out:spike的时间是我们发现不消费的时间,但有可能之前就已经有问题了,这么高是重启了consumer。
2.脱敏代码
代码语言:javascript复制package test;
import com.google.common.collect.Lists;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.pulsar.client.api.Producer;
@Service
public class PushService implements IPushService {
private final static Logger logger = LoggerFactory.getLogger(PushService.class);
@Resource
private IMqBackoffService mqBackoffService;
@Resource(name = "XXXStatusProducer")
private ProducerYYYStatusUUUProducer;
@Override
public void pushZZZPPPRRRStatus(Long userId, ListRRRList) {
String content = GsonUtil.beanToJsonString(RRRList);
try {
//使用的是异步,且批量的发送方式,按照key做sharding发送到topic不同的partition.
YYYStatusUUUProducer.newMessage().key(String.valueOf(userId))
.value(content.getBytes(StandardCharsets.UTF_8)).sendAsync().exceptionally((e -> {
logger.error("send sync ZZZ PPP RRR status error,content:{}", content, e);
// 如果发送失败,会将异步发送失败的消息存到aws的aurora数据库,使用数据库的本地事务(shardingjdbc4.1.1),事务使用的注解方式.
mqBackoffService.saveYYYStatus(RRRList);
return null;
}));
} catch (Exception e) {
logger.error("send ZZZ PPP RRR status error,content:{}", content, e);
mqBackoffService.saveYYYStatus(RRRList);
}
}
}
代码语言:javascript复制consumer使用的是key-sharding方式消费,脱敏代码:
代码语言:javascript复制package test;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
@Component
public class XXXResultMessageListener implements MessageListener{
private static final Logger log = LoggerFactory.getLogger(XXXResultMessageListener.class);
private final IoooService oooService;
public XXXResultMessageListener(IoooService oooService) {
this.oooService = oooService;
}
@SneakyThrows
@LogTid
@Override
public void received(Consumerconsumer, Messagemsg) {
String body = new String(msg.getValue());
try {
if (StrUtil.isBlank(body)) {
log.warn("YYY result received msg value is null messageId:{}", msg.getMessageId());
consumer.acknowledge(msg.getMessageId());
return;
}
log.info("YYY result,topicName:{},message:{}", msg.getTopicName(), body);
ListTTTUUUStatusRespList = JSONUtil.toList(body, TTTUUUStatusResp.class);
if (CollUtil.isNotEmpty(TTTUUUStatusRespList)) {
TTTUUUStatusResp TTTUUUStatusResp = TTTUUUStatusRespList.get(0);
if (UserTypeEnum.PPP.getCode().equals(TTTUUUStatusResp.getUserType())) {
// 只是设置标志,是否打印日志.
ThreadFilter.setPrintFlag(false);
}
// 同步处理,等处理完成后再获取消息,涉及到的aurora数据库的本地事务,使用的是shardingjdbc4.1.1,事务都是用的注解方式.
oooService.vvvBatchReceived(TTTUUUStatusRespList);
}
consumer.acknowledge(msg.getMessageId());
} catch (Exception e) {
if (e instanceof BaseRuntimeException) {
// BaseRuntimeException是业务异常,业务规定不用处理,直接返回ack.
consumer.acknowledge(msg.getMessageId());
} else {
log.error("YYY result Failed to process message error,data is {},e:{}", body, e.getStackTrace(), e);
// 如果业务处理失败,告知pulsar重新消费.
consumer.negativeAcknowledge(msg);
}
throw e;
} finally {
ThreadFilter.removePrintFlag();
}
}
}
3.问题排查
producer使用的是批量发送(平均8条左右是一批,最多40条是一批),并且是异步发送,使用key-sharding的方式发送到这个topic的不同分区。
这个40个分区的topic,现在是有2个分区啊严重堆积,各堆积了30万不消费。但是重启broker后会消费,但是当消费万1~2万消息后又不消费了,再次重启又消费一点然后停止,反复如此。而且我们一直给的压力都很大,压极限,之前也经常出现处理能力不足造成的几十万消息堆积,但是处理能力上来后都可以平滑的消费万,但是11.12号突然出现消费不了的情况。
在问题分区上查看了pulsar-broker的所有线程,没有死锁。
经过不断查、重启,现在连个分区还各余不到10万堆积。
arthas看堆积的broker节点,也没有啥异常:
我查阅了2.8.1release的fix list,注意到了2.8.1的这个修复: [broker] Fix issue where Key_Shared consumers could get stuck #10920 #10920
然后我同样看了下问题分区的stats,和上述issue中的stats做对比,不是同一个现象,但确实有些问题: a1.consumers竟然是空的,也就是没有消费者,那肯定没有办法消费了吧?在issue#10920中consumers是有消费者的。 a2.subscriptions下的msgBacklog和msgBacklogNoDelayed有值且相等且正确,但是backlogSize确实0,这个貌似不对。
顺着思路我查看了这个topic其他正常的分区,正常分区的stats是: 1.consumers有消费者实例,正确。 2.subscriptions下的msgBacklog和msgBacklogNoDelayed,backlogSize有值且相等且正确,都是0。
也就是说,在这种场景下,somehow的未知原因会导致严重堆积/不均衡的分区会丢失自己的consumer么。
接着查,发现是因为消费者的jvm假死了,消费者是两台机器(8c16g),都假死了,但是奇怪的是第一台机器的所关联的分区还在(topics stats可以看到),第二台机器所关联的分区不在了(topics stats看到的consuemrs为空)。
假死原因是因为垃圾回收用的是G1,之前用ZGC的时候,即使积压了几十万,从来没死过,过会就缓过来了。
现在我把consumer重启(垃圾回收还是G1),问题分区的topic stats:然后跑一跑又停了(消费者hang住了),但是topic stats中的consuemrs还在。
果然如所猜测的一致,当消费者由于GC假死后,分区的topic stats中的consumers为空,pulsar-broker应该是通过心跳判断consuemr已经断开了。
目前从重现&各种现象下的分析结果是:消费者如果使用的是G1(已经做调优),并且在海量消息段时间涌入时,会出现假死,然后从pulsar-broker上断开。导致无法消费。
再切ZGC试试,发现ZGC也不行,经查是因为每批发送40个消息,之前用ZGC的时候是每批发送10个消息。
如下图,是consumer刚恢复时pulsar给consumer吐出的速率,但是一个消费者的TPS是600多(业务重),肯定是无法消费过来的,但还接受海量的消息,会不会把内存中的queue撑爆了?
我的问题是:为啥我的消费能力只有600多,但是pulsar却给我这么海量的消息,我并没有取这么多消息啊。
目前阶段定位是: pulsar-consumer默认使用的是push方式,大量积压后,消费者重启时,pulsar-broker会推送海量消息到consumer,直接把consuemr内存打爆。
(3).最终原因与解决
key_shard模式下,pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,receiverQueueSize只能限制这个queue的大小,但这个限制没用,只能让服务慢点死。因为每个key_shard都在client分配了一个singleThreadPool来处理,而这个singleThreadPool是一个无界队列,receiverQueue不断接收到消息后转发到key_shard的threadPool中的无界队列后,receiverQueue继续从pulsar-broker接收推送来的消息,直到把所有的key_shard的threadpool的无界队列打到最大极限,把服务吃死。
这个问题只有两种解决方式: 1.增加consumer的消费节点,和每个消费节点的消费能力。但是我们的业务场景是不可能的,因为处理速度是恒定的,我不可能无限加节点去解决这个问题。如果还要用这个方式,当出现这个问题后,只能临时加节点结合不断重启来解决,这样做也很奇葩。 2.改用同步pull的方式去消费,即当本地线程池处理完消息后再到receiverQueue中拿消息,这样就不会把本地线程池的queue打满。肯定没有问题。对于我们的业务来说,这个是正确选择。本质上还是对pulsar的使用不当造成的,用其他的消息队列用push的话也会这样大量堆积。
最后再次简述原因:
pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,结果本地处理这些消息的线程池/本地队列也是异步的去从receiverQueue中拿消息,且本地队列没有限制队列长度,然后直接打满本地队列。
我提交的相关issue地址:
https://github.com/apache/pulsar/issues/12800