spark streaming访问kafka出现offset越界问题处理

2021-02-24 19:22:39 浏览数 (1)

背景

项目中使用了spark streaming kafka来做实时数据分析,有的时候在访问kafka时会报offset越界错误(OffsetOutOfRangeException),如下:

消费kafka offset越界错误消费kafka offset越界错误

分析

从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到本地中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 越界示意图

消费offset越界示意图消费offset越界示意图

头部越界: 本地保存的offset在topic中仍然存在的最老message的offset之前时(local_offset < earliest_offset); 尾部越界: 本地保存的offset在topic中最新message的offset之后时(local_offset > last_offset)

是什么导致头部越界呢? 考虑到kafka broker配置中修改了message的保持时间为24小时:

代码语言:javascript复制
log.retention.hours=24(The minimum age of a log file to be eligible for deletion)

因此,应该是kafka 中未被消费的数据被broker清除了,使得消费的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

试验

1、改kafka broker 的retention time 为2分钟

2、修改完成后重启kafka

3、使用zk shell 命令得到解析器所保存的zk_offset

4、停止spark streaming kafka DirectStream job

5、发送数据到kafka topic,等待一段时间(超过两分钟)

6、启动streaming job,复现该异常。

通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现local_offset<earliest_offset时矫正local_offset为合法值。

自动修正offset核心代码

代码语言:javascript复制
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.storagelevel import StorageLevel
from kafka import SimpleClient
from kafka.common import OffsetRequestPayload

# 获取 offset
localOffsetRanges = []
if os.path.isfile('%s/%s_offset.txt' % (config.offset_store_location, groupid)):
    with open('%s/%s_offset.txt' % (config.offset_store_location, groupid), 'rb') as f:
        localOffsetRanges = pickle.load(f)
offsets = {}

cur_kafka_topic_offset_map = {}
for temp_topic in topics:
    # 获取kafka当前最小和最大的offset信息,用于跟当前消费到的offset进行对比,以便自动修复潜在的消费kafka offset头尾越界问题,避免人工干预。
    temp_offset_map = get_cur_kafka_topic_offset_map(brokers, temp_topic)
    if temp_offset_map:
        cur_kafka_topic_offset_map[temp_topic] = temp_offset_map

fix_offset_content = u""
total_fix_num = 0
alert_fix_max_num = 5
alert_fix_num = 0
for offsetRange in localOffsetRanges:
    temp_topic = offsetRange.topic
    partition_idx = offsetRange.partition
    if temp_topic in topics:
        topicPartition = TopicAndPartition(temp_topic, partition_idx)
        cur_consumer_offset = offsetRange.untilOffset
        temp_offset_map = cur_kafka_topic_offset_map.get(temp_topic)
        if temp_offset_map:
            cur_kafka_topic_offset_infos = temp_offset_map.get(partition_idx)
            if cur_kafka_topic_offset_infos:
                cur_kafka_topic_min_offset = cur_kafka_topic_offset_infos[0]
                cur_kafka_topic_max_offset = cur_kafka_topic_offset_infos[1]
                if cur_kafka_topic_min_offset > 0 and cur_consumer_offset < cur_kafka_topic_min_offset:

                    total_fix_num  = 1
                    alert_fix_num  = 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content  = "nconsumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                    print(
                        "cur_consumer_offset(%s)<cur_kafka_topic_min_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_min_offset

                if cur_kafka_topic_max_offset > 0 and cur_consumer_offset > cur_kafka_topic_max_offset:

                    total_fix_num  = 1
                    alert_fix_num  = 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content  = "nconsumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)

                    print(
                        "cur_consumer_offset(%s)>cur_kafka_topic_max_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_max_offset

        offsets[topicPartition] = cur_consumer_offset

if total_fix_num > 0:
    receivers = config.recvers.split(',')
    alarmopt = alarm_opt.AlarmOpt(receivers)
    alert_content = u"[%s][消费offset和最新offset有出入(共修正%s次)]:n%s" % (params_name, total_fix_num, fix_offset_content)
    alarmopt.alarm(alarm_opt.WX_SMS, alert_content, u'spark告警')

代码语言:javascript复制
def get_cur_kafka_topic_offset_map(brokers,topic):
    cur_kafka_offset_map={}
    try:
        client = SimpleClient(brokers)
        LATEST = -1
        EARLIEST = -2
        # 获取topic分区数
        partitions = client.topic_partitions[topic]
        partition_num=len(partitions.keys())
        print("partition_num=%s,topic=%s" % (partition_num,topic))
        # 获取每个分区的最小offset
        min_offset_requests = [OffsetRequestPayload(topic, p, EARLIEST, 1) for p in partitions.keys()]
        min_offsets_responses = client.send_offset_request(min_offset_requests)
        if not min_offsets_responses or len(min_offsets_responses)!=partition_num:
            print("min_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(min_offsets_responses)=%s,topic=%s" % (len(min_offsets_responses),topic))
        for r in min_offsets_responses:
            cur_kafka_offset_map[r.partition] = [r.offsets[0]]
        # 获取每个分区的最大offset
        max_offset_requests = [OffsetRequestPayload(topic, p, LATEST, 1) for p in partitions.keys()]
        max_offsets_responses = client.send_offset_request(max_offset_requests)
        if not max_offsets_responses or len(max_offsets_responses)!=partition_num:
            print("max_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(max_offsets_responses)=%s,topic=%s" % (len(max_offsets_responses),topic))
        cur_kafka_offset_str=""
        for r in max_offsets_responses:
            if cur_kafka_offset_map.has_key(r.partition):
                cur_kafka_offset_map[r.partition].append(r.offsets[0])
            else:
                cur_kafka_offset_map[r.partition] = [-1, r.offsets[0]]
            partition_info_str="[%s,%s,%s]"%(r.partition,cur_kafka_offset_map[r.partition][0],cur_kafka_offset_map[r.partition][1])
            if cur_kafka_offset_str=="":
                cur_kafka_offset_str=partition_info_str
            else:
                cur_kafka_offset_str  = ",%s" % (partition_info_str)
        print("cur_kafka_offset_str=%s,topic=%s,brokers=%s" % (cur_kafka_offset_str, topic,brokers))
        return cur_kafka_offset_map
    except Exception as e:
        print("get_cur_kafka_topic_offset_map Exception: %s,topic=%s,brokers=%s"%(str(e),topic,brokers))
        return None

0 人点赞