一、异常日志
1、client日志
连接到僵死的broker的kafka客户端,会有如下报错:
此时客户端发往服务端的请求是处于异常阶段,如果不能快速恢复的话,存在数据丢失的风险。
2、broker日志
首先我们批量检索一下服务端的日志,发现有很多broker与7号broker通信异常
推测7号broker出现间断性僵死的问题,查阅相关server.log
发现:
注意,这次数据越界的id值是1
代码语言:txt复制java.lang.ArrayIndexOutOfBoundsException: 1
二、问题分析
1、紧急分析与紧急恢复
从上文可以得知,broker在处理协议时出现数组越界的问题,问题类似笔者在《kafka高版本Client连接0.9Server引发的血案》文章中提及的高版本客户端访问0.9kafka集群导致broker僵死
报错的日志内容有差异,但基本可以确认是客户端异常链接导致。为了紧急恢复,我们选择了最快的方法(目前kafka集群最多的连接来自Xone平台的Storm作业):
- 提取了最早的报错时间
- 联系Xone平台侧找到该时间段发布的Storm任务
- 联系任务发布者核实客户端情况
最终确认是任务发布者为了控制一次拉取的条数,更新了kafka客户端依赖(从0.9.0.1
升级到0.10.1.0
),任务发布者停止任务,集群恢复正常!
2、问题复现
集群恢复后,我们就要复现问题,追究到底。
当我在本地环境使用0.10.1.0
客户端连接0.9.0.1
的集群时,发现我们的任务直接抛异常退出了。
不难看出,因为协议不兼容,0.10.1.0
客户端无法获取topic_metadata
字段,所以直接退出,服务端没有相关数组越界的报错。
笔者注: kafka 0.x 的版本客户端和服务端兼容问题确实是做的不好,动不动就是数组越界、broker僵死、客户端异常。
和任务开发同学确认逻辑后发现,他在整段代码做了一层异常捕获,且没有在异常捕获里对SchemaException
做处理。按照相关逻辑处理后,成功复现问题。
笔者注:如何正确处理Java中的异常一直是Java开发中的难点,当出现
SchemaException
时,客户端已经是无法和服务端正常通信,此时直接退出是比较合理的处理方法。(不过将心比心,谁又能提前知道这个坑呢?)
3、源码分析
代码语言:txt复制private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
if (apiKey < 0 || apiKey > schemas.length)
throw new IllegalArgumentException("Invalid api key: " apiKey);
Schema[] versions = schemas[apiKey];
if (version < 0 || version > versions.length)
throw new IllegalArgumentException("Invalid version for API key " apiKey ": " version);
// 这里出现的数组越界
if (versions[version] == null)
throw new IllegalArgumentException("Unsupported version for API key " apiKey ": " version);
return versions[version];
}
public static Schema requestSchema(int apiKey, int version) {
return schemaFor(Protocol.REQUESTS, apiKey, version);
}
笔者在本地环境修改了kafka 0.9.0.1
server的源码打印了相关日志:
0.10.1.0客户端消费连接的日志
代码语言:txt复制[debug apiKey]11
[debug versions][{group_id:STRING,session_timeout:INT32,member_id:STRING,protocol_type:STRING,group_protocols:ARRAY({protocol_name:STRING,protocol_metadata:BYTES})}]
[debug version]1
[2021-04-28 13:46:54,408] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:40)
at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:52)
at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:68)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:144)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:55)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:748)
0.9.0.1客户端消费连接的日志
代码语言:txt复制[debug apiKey]11
[debug versions][{group_id:STRING,session_timeout:INT32,member_id:STRING,protocol_type:STRING,group_protocols:ARRAY({protocol_name:STRING,protocol_metadata:BYTES})}]
{group_id:STRING,session_timeout:INT32,member_id:STRING,protocol_type:STRING,group_protocols:ARRAY({protocol_name:STRING,protocol_metadata:BYTES})}
[debug version]0
[2021-04-28 13:49:13,046] INFO [GroupCoordinator 0]: Preparing to restabilize group g_test with old generation 0 (kafka.coordinator.GroupCoordinator)
[2021-04-28 13:49:13,052] INFO [GroupCoordinator 0]: Stabilized group g_test generation 1 (kafka.coordinator.GroupCoordinator)
小结
发现0.10.1.0
客户端发起apiKey
为11的请求时(对应api请求是JOIN_GROUP
),0.9.0.1
服务端的versions
数组的长度是1,从而执行versions[1]
时导致数组越界。
原因: kafka 0.10的
JoinGroup API
,增加了rebalance_timeout_ms
参数,所以version升级到1 image.png
http://kafka.apache.org/0101/protocol.html
我们接着查看一下 0.9Server Schema[][] schemas
的数据填充过程:
kafka的作者以为versions[version] == null
这样可以规避版本不存在的问题,却导致了经典的二维数组越界问题。幸运的是高版本的kafka已经完全移除这部分的实现。
三、总结
- kafka
0.10.1.0
是一个过渡版本,用户并不是很多,所以相关报错日志在网络上几乎检索不到。 - kafka
0.9.0.1
这个古老的版本bug实在是太多了,今年会裁撤所有该版本的集群。 - kafka协议的设计算是比较不错,将复杂的内容梳理得很清晰,值得学习。
更多内容可以关注我的公众号~