【技术种草】CKafka调优笔记 消费堆积 服务CPU未跑满应该如何解决?

2021-11-19 11:40:14 浏览数 (2)

1. 背景

Proxy服务负责消费CKafka消息并解析,并分发消息至不同的CKafka topic。近期发现Proxy服务消费CKafka有消息堆积,且服务所在CVM CPU与内存资源大概只占用50%左右。

如图所示可以看到,在数据量峰值的的时候,生产流量可以达到2000MB/小时,但是消费流量达不到这么多,说明该服务有消息堆积。

其他说明:CKafka partition数量与服务实例数量正好一比一关系,CKafka 消费Client Concurrence设置为1。Proxy服务维护一个线程池,用于解析与分发消费的每一条消息。每当有消息进入服务时,每条消息会用一个线程进行解析消息并发送数据。

代码语言:txt复制
    @KafkaListener(topics = "topic")
    public void consumerKafkaMsg(List<ConsumerRecord<?, String>> records) throws Exception {
        for (ConsumerRecord<?, String> record : records) {
            log.debug("kafka topic = {}, value:n{}", record.topic(), record.value());
            service.handleMsg(record);
        }
    }
代码语言:txt复制
    @PostConstruct
    public void init() {
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(consumerCount);
        RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build();
        threadPool = new ThreadPoolExecutor(consumerCount, consumerCount, 0L, TimeUnit.MILLISECONDS,
                workingQueue, namedThreadFactory, rejectedExecutionHandler);
    }

    public void handleMsg(ConsumerRecord<?, String> record) {
        threadPool.execute(new ThreadPoolTask(recorde));
    }

2. 问题分析

使用Arthas工具分析一下堆栈,如下图,可以看到每个线程都在TIMED_WAITING的等待状态,CPU消耗也很低,初步判断消费堆积并不是因为线程数量不够,而是卡在IO。

在这里插入图片描述在这里插入图片描述
在这里插入图片描述在这里插入图片描述

查看线程状态也可以看到线程池中每个线程都在等待,卡在方法dosent上面,有可能是CKafka集群限流。

3 尝试解决

3.1 增大消息解析分发线程池队列长度

上面代码2中可以看到线程池队列长度是和线程数保持一致,因为线程池的策略是线程数达到队列最大时就由主线程去执行线程作业,从而导致主线程没有继续拉取数据,其他线程执行完成之后没有数据就如上图所示等待主线程完成作业后再去拉取数据。

增大了线程池队列长度之后,发现线程状态还是变化不大,也还是一直在等待主线程。

3.2 线程任务调优

方法一效果并不是很明显,我们可以换一个思路。在代码1中每条消息都会有一个线程去执行任务,因为消息较多,每个消息一个线程会有点效率低下,可以尝试将一批数据放入一个线程,提高线程的CPU利用率,从而解决问题。

代码语言:txt复制
    public void handleMsg(List<ConsumerRecord<?, String>> records) {
        threadPool.execute(new ThreadPoolTask(records));
    }

修改完成后发现线程CPU利用率上升明显。

3.3 CKafka Producer 参数修改

同时重新查看Arthas里面每个线程的状态,线程卡在kafkaTemplate里面的dosent方法,再往上是awiat

在这里插入图片描述在这里插入图片描述

查看await方法源码,发现其实是在等待batchSize。因为压测时batchSize设置得比较大,在正式环境中数据量没达到压测大数据量,但是因为这个方法一直在等待batchSize填充完成,所以才导致线程一直在等待没有发送Kafka消息,卡在dosent上面。

在这里插入图片描述在这里插入图片描述

PS: 还有一个LingerMs参数控制发送,batchSIze与lingerMs时间哪一个先达到则就发送。LingerMs的默认时间是1分钟。结果与总结

腾讯云监控还是起了很大作用,在调优过程有很大参考意义,Ckafka或者组件都需要进行适当的参数调整才能发挥最大作用

效果还是比较明显可以看到机器CPU负载提高显著,未消费的Kafka消息也慢慢降低,达到预期。

0 人点赞