kafka中 DescribeLogDirs请求参数引起的一个问题

2023-02-28 15:11:49 浏览数 (2)

某天,测试人员找到我,反馈说CI的kafka用例失败了,麻烦定位一下。

"麻烦先找下我们的小马甲——公共服务",这句话还没发出去,对方已经先把环境信息给发了过来。

想想应该是个小问题,索性直接先看了。

然后习惯性的登录到环境,先看下进程在不在、端口有没有监听、能不能生产消费后,发现一切都正常后,服务本身好像没什么毛病。这才问了下测试的兄弟,是什么用例失败,具体表现是怎样的?

测试:“喏,所有请求都正常,唯独这个请求一直超时”

代码语言:javascript复制
DescribeLogDirsRequest ret = admin.describeLogDirs(brokerIds);
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();

我:"那这个请求是针对具体哪个topic会超时吗?"

测试:"不,所有的topic,都会超时!"

再次查看了服务端的日志,发现完全没有任何错误信息,连个告警的信息都没有。

我:“你能再运行下这个用例吗?“

测试:“跑过了,还是超时”

我:“你确定你们执行用例的节点和kafka的网络是通的吗,不会是网络不通吧?“

测试:"不可能,所有用例都是在一个节点上执行的,topic的其他操作也都没问题,就这个超时!"

再次排除了可能有影响的因素后,发现问题仍旧存在,好像不得不分析下源码了,可简单看了下源码后,客户端就是发送一个请求,而服务端又完全没有任何错误信息。

而恰好在搜索请求DescribeLogDirs关键字时发现,有对应的(命令)封装类(LogDirsCommand),可以通过kafka-run-class.sh来调用。

于是直接调用脚本进行了测试:

代码语言:javascript复制
[root@kafka-0 bin]# sh kafka-run-class.sh kafka.admin.LogDirsCommand --bootstrap-server 10.38.221.239:9092 --broker-list 0 --describe --topic-list table_test1
Querying brokers for log directories information
Received log directory information from brokers 0
{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/opt/kafka","error":null,"partitions":[{"partition":"table_test1-1","size":80936040,"offsetLag":0,"isFuture":false}]}]}]}

唉,神奇了,调用没有报错,也正确返回了结果,为啥CI用例就失败了呢?

于是,进一步分析了下相关的参数:

代码语言:javascript复制
--bootstrap-server: 指定kafka broker的地址(必需的参数)
--describe: 描述指定brokers的指定(topic分区)目录信息(必需的参数)
--broker-list:用于指定请求的kafka broker的ID列表(非必需的参数)
--topic-list: 指定的topic列表(必需的参数)

其中,需要注意的是"--broker-list"这个参数,如果不带该参数,则以元数据请求中的kafka集群信息为准,否则以指定的"--broker-list"为准。

那么,测试CI的那个问题难道是参数指定了不存在(或者已停止)的kafka节点?

带着疑问,再次敲了命令,这次在"--broker-list"中指定了一个实际不存在的ID。

代码语言:javascript复制
[root@kafka-0 bin]# sh kafka-run-class.sh kafka.admin.LogDirsCommand --bootstrap-server 10.38.221.239:9092 --broker-list 0,9 --describe --topic-list table_test1
Querying brokers for log directories information
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
        at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:51)
        at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:37)
        at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

噢,这次的情况变成了超时,和CI失败的用例的现象是一致的。

至于为什么会超时,分析了下"KafkaAdminClient"的源码,主要逻辑为:对于请求中的每个BrokerID,都需要从元数据请求中找到对应的broker信息,然后分别向这些broker建立连接,并真正发送请求。否则一直在pending队列中,直到元数据请求信息能匹配到对应的信息或请求超时。

感觉问题基本清楚的同时,心里也有了一定的底气,再次询问了下测试兄弟,请求参数的值是什么?是不是填错了?经过测试兄弟的确认后,发现入参"broker-list"的值与实际部署的kafka节点数不一致,也就是说"broker-list"中有不存在的broker ID,最终导致了请求超时的问题。经过修改参数后,CI用例都成功通过了。

小结一下,本问题其实是一个很简单的问题,关键在于使用时需要清楚地知道对应参数的含义,否则就可能引起问题。

0 人点赞