sparkstreaming遇到的问题

2020-12-18 14:29:07 浏览数 (1)

这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。

sparkstreaming offset存储

sparkstreaming采用kafkaUtils的createDirectStream()处理kafka数据的方式,会直接从kafka的broker的分区中读取数据,跳过zookeeper,并且没有receiver,是spark的task直接对接kafka topic partition。

由于这种方式没有经过ZK,topic的offset没有保存,当job重启后只能从最新的offset开始消费数据,造成重启过程中的消息丢失。

如果spark自动提交,会在sparkstreaming刚运行时就立马提交offset,如果这个时候Spark streaming消费信息失败了,那么offset也就错误提交了。

所以要在sparkstreaming中实现exactly-once恰好一次,必须

1.手动提交偏移量

2.处理完业务数据后再提交offset

手动维护偏移量 需设置kafka参数enable.auto.commit改为false

手动维护提交offset有两种选择:

1.处理完业务数据后手动提交到Kafka

2.处理完业务数据后手动提交到本地库 如MySql、HBase

也可以将offset提交到zookeeper,但是经过我们测试,发现zookeeper不适合存储大量数据,在大数据量的情况下很容易崩溃。

我们来看下如何将offset存储到mysql中:

代码语言:javascript复制
/ 处理完 业务逻辑后,手动提交offset偏移量到本地Mysql中
stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })

HBase中也是类似的

代码语言:javascript复制
inputDStream.foreachRDD((rdd, batchTime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      val newRDD = rdd.map(message => processMessage(message))
      newRDD.count()
      //save the offsets to HBase  批量处理把数据存储到Hbase当中
      saveOffsets(topic, consumerGroupID, offsetRanges, hbaseTableName, batchTime)
    })
    ssc
  }


  /**
    * 对数据进行处理
    * @param message
    * @return
    */
  def processMessage(message: ConsumerRecord[String, String]): ConsumerRecord[String, String] = {
    message
  }


  /*
Save Offsets into HBase
 */
  def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange], hbaseTableName: String,
                  batchTime: org.apache.spark.streaming.Time) = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val rowKey = TOPIC_NAME   ":"   GROUP_ID   ":"   String.valueOf(batchTime.milliseconds)
    val put = new Put(rowKey.getBytes)
    for (offset <- offsetRanges) {
      put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
        Bytes.toBytes(offset.untilOffset.toString))
    }
    table.put(put)
    conn.close()
  }
  /*
Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
  - CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from
    Zookeeper and for each partition returns the last committed offset as 0
  - CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last
    committed offsets for each topic-partition is returned as is from HBase.
  - CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last
    committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,
    function returns last committed offsets as 0
 */
  def getLastCommittedOffsets(TOPIC_NAME: String, GROUP_ID: String, hbaseTableName: String, zkQuorum: String,
                              zkRootDir: String, sessionTimeout: Int, connectionTimeOut: Int): Map[TopicPartition, Long] = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val zkUrl = zkQuorum   "/"   zkRootDir
    val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeOut)
    val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
    val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
    //Connect to HBase to retrieve last committed offsets
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val startRow = TOPIC_NAME   ":"   GROUP_ID   ":"   String.valueOf(System.currentTimeMillis())
    val stopRow = TOPIC_NAME   ":"   GROUP_ID   ":"   0
    val scan = new Scan()
    val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
    val result = scanner.next()
    var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
    if (result != null) {
      //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
      hbaseNumberOfPartitionsForTopic = result.listCells().size()
    }
    val fromOffsets = collection.mutable.Map[TopicPartition, Long]()
    if (hbaseNumberOfPartitionsForTopic == 0) {
      // initialize fromOffsets to beginning
      for (partition <- 0 to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets  = (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else if (zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic) {
      // handle scenario where new partitions have been added to existing kafka topic
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
        fromOffsets  = (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
      for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets  = (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else {
      //initialize fromOffsets from last run
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
        fromOffsets  = (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
    }
    scanner.close()
    conn.close()
    fromOffsets.toMap
  }
}

第一个问题 numRecords must not be negative

当*删除已经使用过的kafka topic,然后新建同名topic*,用spark streaming Kakfa createDirectStream方法时出现了"numRecords must not be negative"异常,这个异常是不合法的参数异常,RDD的记录数目必须不能是负数

异常分析

首先我们看异常打印出现问题的位置

org.apache.spark.streaming.scheduler.StreamInputInfo.InputInfoTracker的第38行

代码语言:javascript复制
/**
 * :: DeveloperApi ::
 * Track the information of input stream at specified batch time.
 *
 * @param inputStreamId the input stream id
 * @param numRecords the number of records in a batch
 * @param metadata metadata for this batch. It should contain at least one standard field named
 *                 "Description" which maps to the content that will be shown in the UI.
 */
@DeveloperApi
case class StreamInputInfo(
    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
  require(numRecords >= 0, "numRecords must not be negative")

  def metadataDescription: Option[String] =
    metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
}

代码38行,判断了numRecords是否大于等于0,当不满足条件时抛出异常,可判断此时numRecords<0。 numRecords的解释: numRecords: the number of records in a batch 应该是当前rdd中records 数目计算出了问题。

offsetRanges的计算逻辑

offsetRanges的定义

代码语言:javascript复制
offsetRanges: offset ranges that define the Kafka data belonging to this RDD

在KafkaRDDPartition 40行找到kafka partition offsetRange的计算逻辑:

代码语言:javascript复制
def count(): Long = untilOffset - fromOffset`
`fromOffset: per-topic/partition Kafka offset defining the (inclusive) starting point of the batch`
`untilOffset: per-topic/partition Kafka offset defining the (inclusive) ending point of the batch

fromOffset来自zk中保存; untilOffset通过DirectKafkaInputDStream第145行:

代码语言:javascript复制
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))

计算得到,计算过程得到最新的offset,然后使用spark.streaming.kafka.maxRatePerPartition做clamp,得到允许的最大untilOffsets,##而此时新建的topic,如果topic中没有数据,untilOffsets应该为0##

原因总结

当删除一个topic时,zk中的offset信息并没有被清除,因此KafkaDirectStreaming再次启动时仍会得到旧的topic offset为old_offset,作为fromOffset。 当新建了topic后,使用untiloffset计算逻辑,得到untilOffset为0(如果topic已有数据则>0); 再次被启动的KafkaDirectStreaming Job通过异常的计算逻辑得到的rdd numRecords值为可计算为: numRecords = untilOffset - fromOffset(old_offset) 当untilOffset < old_offset时,此异常会出现,对于新建的topic这种情况的可能性很大

解决方法

根据以上分析,可在确定KafkaDirectStreaming 的fromOffsets时判断fromOffset与untiloffset的大小关系,当untilOffset < fromOffset时,矫正fromOffset为offset初始值0。

•从zk获取topic/partition 的fromOffset•利用SimpleConsumer获取每个partiton的lastOffset(untilOffset )•判断每个partition lastOffset与fromOffset的关系•当lastOffset < fromOffset时,将fromOffset赋值为0 通过以上步骤完成fromOffset的值矫正。

矫正offset的核心代码如下:

代码语言:javascript复制
 /** 以下 矫正 offset */
    // 得到Topic/partition 的lastOffsets
    Map&lt;TopicAndPartition, Long&gt; topicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
            topicList, "my.group.id");

    // 遍历每个Topic.partition
    for (Map.Entry&lt;TopicAndPartition, Long&gt; topicAndPartitionLongEntry : fromOffsets.entrySet()) {
      // fromOffset &gt; lastOffset时
      if (topicAndPartitionLongEntry.getValue() &gt;
          topicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey())) {
         //矫正fromoffset为offset初始值0
        topicAndPartitionLongEntry.setValue(0L);
      }
    }
    /** 以上 矫正 offset */

第二个问题 Offsets out of range

Kafka DirectStream 读取topic中数据做测试,停止了一段时间,再次启动时出现了kafka.common.OffsetOutOfRangeException

异常如下:

代码语言:javascript复制
0/12/16 11:08:33 WARN TaskSetManager: Lost task 2.0 in stage 105.0 (TID 85, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {register_topic-5=23550}

异常分析

offset越界分为头越界和尾越界,头部越界是zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);尾越界是zookeeper中保存的offset在topic中最新message的offset之后(zk_offset > last_offset),我们在前面遇到了这个问题,并做了处理,因此这个问题应该是头部越界导致。

经过分析,我们有一段时间没有消费topic中的数据了,大概已经过了七天,而kafka broker中我们设置的log保存时间为七天

因此,应该是kafka 中未被消费的数据被broker清除了,使得从zookeeper中读取到的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.ms的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现zk_offset时矫正zk_offset为合法值

矫正offset的核心的代码如下:

代码语言:javascript复制
/** 以下 矫正 offset */

    // lastest offsets
    Map&lt;TopicAndPartition, Long&gt; lastestTopicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
            Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));

    // earliest offsets
    Map&lt;TopicAndPartition, Long&gt; earliestTopicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"),
            Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));


    for (Map.Entry&lt;TopicAndPartition, Long&gt; topicAndPartitionLongEntry : fromOffsets.entrySet()) {

      long zkOffset = topicAndPartitionLongEntry.getValue();
      long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
      long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
      // zkoffset 不在可用message offset区间内
      if (zkOffset &gt; lastestOffset || zkOffset &lt; earliestOffset) {
        // set offset = earliestOffset
        logger.warn("矫正offset: "   zkOffset  " -&gt; "  earliestOffset);
        topicAndPartitionLongEntry.setValue(earliestOffset);
      }
    }

0 人点赞