背景
项目中使用了spark streaming kafka来做实时数据分析,有的时候在访问kafka时会报offset越界错误(OffsetOutOfRangeException),如下:
分析
从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到本地中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 越界示意图
头部越界: 本地保存的offset在topic中仍然存在的最老message的offset之前时(local_offset < earliest_offset); 尾部越界: 本地保存的offset在topic中最新message的offset之后时(local_offset > last_offset)
是什么导致头部越界呢? 考虑到kafka broker配置中修改了message的保持时间为24小时:
代码语言:javascript复制log.retention.hours=24(The minimum age of a log file to be eligible for deletion)
因此,应该是kafka 中未被消费的数据被broker清除了,使得消费的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。
试验
1、改kafka broker 的retention time 为2分钟
2、修改完成后重启kafka
3、使用zk shell 命令得到解析器所保存的zk_offset
4、停止spark streaming kafka DirectStream job
5、发送数据到kafka topic,等待一段时间(超过两分钟)
6、启动streaming job,复现该异常。
通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。
解决方法
首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现local_offset<earliest_offset时矫正local_offset为合法值。
自动修正offset核心代码
代码语言:javascript复制from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.storagelevel import StorageLevel
from kafka import SimpleClient
from kafka.common import OffsetRequestPayload
# 获取 offset
localOffsetRanges = []
if os.path.isfile('%s/%s_offset.txt' % (config.offset_store_location, groupid)):
with open('%s/%s_offset.txt' % (config.offset_store_location, groupid), 'rb') as f:
localOffsetRanges = pickle.load(f)
offsets = {}
cur_kafka_topic_offset_map = {}
for temp_topic in topics:
# 获取kafka当前最小和最大的offset信息,用于跟当前消费到的offset进行对比,以便自动修复潜在的消费kafka offset头尾越界问题,避免人工干预。
temp_offset_map = get_cur_kafka_topic_offset_map(brokers, temp_topic)
if temp_offset_map:
cur_kafka_topic_offset_map[temp_topic] = temp_offset_map
fix_offset_content = u""
total_fix_num = 0
alert_fix_max_num = 5
alert_fix_num = 0
for offsetRange in localOffsetRanges:
temp_topic = offsetRange.topic
partition_idx = offsetRange.partition
if temp_topic in topics:
topicPartition = TopicAndPartition(temp_topic, partition_idx)
cur_consumer_offset = offsetRange.untilOffset
temp_offset_map = cur_kafka_topic_offset_map.get(temp_topic)
if temp_offset_map:
cur_kafka_topic_offset_infos = temp_offset_map.get(partition_idx)
if cur_kafka_topic_offset_infos:
cur_kafka_topic_min_offset = cur_kafka_topic_offset_infos[0]
cur_kafka_topic_max_offset = cur_kafka_topic_offset_infos[1]
if cur_kafka_topic_min_offset > 0 and cur_consumer_offset < cur_kafka_topic_min_offset:
total_fix_num = 1
alert_fix_num = 1
if alert_fix_num <= alert_fix_max_num:
if fix_offset_content == "":
fix_offset_content = "consumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
else:
fix_offset_content = "nconsumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
print(
"cur_consumer_offset(%s)<cur_kafka_topic_min_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic, brokers))
cur_consumer_offset = cur_kafka_topic_min_offset
if cur_kafka_topic_max_offset > 0 and cur_consumer_offset > cur_kafka_topic_max_offset:
total_fix_num = 1
alert_fix_num = 1
if alert_fix_num <= alert_fix_max_num:
if fix_offset_content == "":
fix_offset_content = "consumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)
else:
fix_offset_content = "nconsumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)
print(
"cur_consumer_offset(%s)>cur_kafka_topic_max_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic, brokers))
cur_consumer_offset = cur_kafka_topic_max_offset
offsets[topicPartition] = cur_consumer_offset
if total_fix_num > 0:
receivers = config.recvers.split(',')
alarmopt = alarm_opt.AlarmOpt(receivers)
alert_content = u"[%s][消费offset和最新offset有出入(共修正%s次)]:n%s" % (params_name, total_fix_num, fix_offset_content)
alarmopt.alarm(alarm_opt.WX_SMS, alert_content, u'spark告警')
代码语言:javascript复制def get_cur_kafka_topic_offset_map(brokers,topic):
cur_kafka_offset_map={}
try:
client = SimpleClient(brokers)
LATEST = -1
EARLIEST = -2
# 获取topic分区数
partitions = client.topic_partitions[topic]
partition_num=len(partitions.keys())
print("partition_num=%s,topic=%s" % (partition_num,topic))
# 获取每个分区的最小offset
min_offset_requests = [OffsetRequestPayload(topic, p, EARLIEST, 1) for p in partitions.keys()]
min_offsets_responses = client.send_offset_request(min_offset_requests)
if not min_offsets_responses or len(min_offsets_responses)!=partition_num:
print("min_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
return None
print("len(min_offsets_responses)=%s,topic=%s" % (len(min_offsets_responses),topic))
for r in min_offsets_responses:
cur_kafka_offset_map[r.partition] = [r.offsets[0]]
# 获取每个分区的最大offset
max_offset_requests = [OffsetRequestPayload(topic, p, LATEST, 1) for p in partitions.keys()]
max_offsets_responses = client.send_offset_request(max_offset_requests)
if not max_offsets_responses or len(max_offsets_responses)!=partition_num:
print("max_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
return None
print("len(max_offsets_responses)=%s,topic=%s" % (len(max_offsets_responses),topic))
cur_kafka_offset_str=""
for r in max_offsets_responses:
if cur_kafka_offset_map.has_key(r.partition):
cur_kafka_offset_map[r.partition].append(r.offsets[0])
else:
cur_kafka_offset_map[r.partition] = [-1, r.offsets[0]]
partition_info_str="[%s,%s,%s]"%(r.partition,cur_kafka_offset_map[r.partition][0],cur_kafka_offset_map[r.partition][1])
if cur_kafka_offset_str=="":
cur_kafka_offset_str=partition_info_str
else:
cur_kafka_offset_str = ",%s" % (partition_info_str)
print("cur_kafka_offset_str=%s,topic=%s,brokers=%s" % (cur_kafka_offset_str, topic,brokers))
return cur_kafka_offset_map
except Exception as e:
print("get_cur_kafka_topic_offset_map Exception: %s,topic=%s,brokers=%s"%(str(e),topic,brokers))
return None