kafka0.10.1.0客户端导致0.9Server僵死问题排查

2022-02-28 20:18:55 浏览数 (2)

一、异常日志

1、client日志

连接到僵死的broker的kafka客户端,会有如下报错:

image.pngimage.png

此时客户端发往服务端的请求是处于异常阶段,如果不能快速恢复的话,存在数据丢失的风险。

2、broker日志

首先我们批量检索一下服务端的日志,发现有很多broker与7号broker通信异常

image.pngimage.png

推测7号broker出现间断性僵死的问题,查阅相关server.log发现:

image.pngimage.png

注意,这次数据越界的id值是1

代码语言:txt复制
java.lang.ArrayIndexOutOfBoundsException: 1

二、问题分析

1、紧急分析与紧急恢复

从上文可以得知,broker在处理协议时出现数组越界的问题,问题类似笔者在《kafka高版本Client连接0.9Server引发的血案》文章中提及的高版本客户端访问0.9kafka集群导致broker僵死

报错的日志内容有差异,但基本可以确认是客户端异常链接导致。为了紧急恢复,我们选择了最快的方法(目前kafka集群最多的连接来自Xone平台的Storm作业):

  • 提取了最早的报错时间
  • 联系Xone平台侧找到该时间段发布的Storm任务
  • 联系任务发布者核实客户端情况

最终确认是任务发布者为了控制一次拉取的条数,更新了kafka客户端依赖(从0.9.0.1升级到0.10.1.0),任务发布者停止任务,集群恢复正常!

2、问题复现

集群恢复后,我们就要复现问题,追究到底。

当我在本地环境使用0.10.1.0客户端连接0.9.0.1的集群时,发现我们的任务直接抛异常退出了。

image.pngimage.png

不难看出,因为协议不兼容,0.10.1.0客户端无法获取topic_metadata字段,所以直接退出,服务端没有相关数组越界的报错。

笔者注: kafka 0.x 的版本客户端和服务端兼容问题确实是做的不好,动不动就是数组越界、broker僵死、客户端异常。

和任务开发同学确认逻辑后发现,他在整段代码做了一层异常捕获,且没有在异常捕获里对SchemaException做处理。按照相关逻辑处理后,成功复现问题。

image.pngimage.png

笔者注:如何正确处理Java中的异常一直是Java开发中的难点,当出现SchemaException时,客户端已经是无法和服务端正常通信,此时直接退出是比较合理的处理方法。(不过将心比心,谁又能提前知道这个坑呢?)

3、源码分析

代码语言:txt复制
private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
    if (apiKey < 0 || apiKey > schemas.length)
        throw new IllegalArgumentException("Invalid api key: "   apiKey);
    Schema[] versions = schemas[apiKey];
    if (version < 0 || version > versions.length)
        throw new IllegalArgumentException("Invalid version for API key "   apiKey   ": "   version);
    // 这里出现的数组越界
    if (versions[version] == null)
        throw new IllegalArgumentException("Unsupported version for API key "   apiKey   ": "   version);
    return versions[version];
}

public static Schema requestSchema(int apiKey, int version) {
    return schemaFor(Protocol.REQUESTS, apiKey, version);
}

笔者在本地环境修改了kafka 0.9.0.1server的源码打印了相关日志:

0.10.1.0客户端消费连接的日志

代码语言:txt复制
[debug apiKey]11
[debug versions][{group_id:STRING,session_timeout:INT32,member_id:STRING,protocol_type:STRING,group_protocols:ARRAY({protocol_name:STRING,protocol_metadata:BYTES})}]
[debug version]1
[2021-04-28 13:46:54,408] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 1
	at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:40)
	at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:52)
	at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:68)
	at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:144)
	at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:55)
	at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
	at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
	at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at kafka.network.Processor.run(SocketServer.scala:421)
	at java.lang.Thread.run(Thread.java:748)
	

0.9.0.1客户端消费连接的日志

代码语言:txt复制
[debug apiKey]11
[debug versions][{group_id:STRING,session_timeout:INT32,member_id:STRING,protocol_type:STRING,group_protocols:ARRAY({protocol_name:STRING,protocol_metadata:BYTES})}]
{group_id:STRING,session_timeout:INT32,member_id:STRING,protocol_type:STRING,group_protocols:ARRAY({protocol_name:STRING,protocol_metadata:BYTES})}
[debug version]0
[2021-04-28 13:49:13,046] INFO [GroupCoordinator 0]: Preparing to restabilize group g_test with old generation 0 (kafka.coordinator.GroupCoordinator)
[2021-04-28 13:49:13,052] INFO [GroupCoordinator 0]: Stabilized group g_test generation 1 (kafka.coordinator.GroupCoordinator)

小结

发现0.10.1.0客户端发起apiKey为11的请求时(对应api请求是JOIN_GROUP),0.9.0.1服务端的versions数组的长度是1,从而执行versions[1]时导致数组越界。

原因: kafka 0.10的 JoinGroup API,增加了rebalance_timeout_ms参数,所以version升级到1 image.pngimage.png

http://kafka.apache.org/0101/protocol.html

我们接着查看一下 0.9Server Schema[][] schemas的数据填充过程:

image.pngimage.png

image.pngimage.png

kafka的作者以为versions[version] == null这样可以规避版本不存在的问题,却导致了经典的二维数组越界问题。幸运的是高版本的kafka已经完全移除这部分的实现。

三、总结

  • kafka0.10.1.0是一个过渡版本,用户并不是很多,所以相关报错日志在网络上几乎检索不到。
  • kafka0.9.0.1这个古老的版本bug实在是太多了,今年会裁撤所有该版本的集群。
  • kafka协议的设计算是比较不错,将复杂的内容梳理得很清晰,值得学习。

更多内容可以关注我的公众号~

0 人点赞