这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。
sparkstreaming offset存储
sparkstreaming采用kafkaUtils的createDirectStream()处理kafka数据的方式,会直接从kafka的broker的分区中读取数据,跳过zookeeper,并且没有receiver,是spark的task直接对接kafka topic partition。
由于这种方式没有经过ZK,topic的offset没有保存,当job重启后只能从最新的offset开始消费数据,造成重启过程中的消息丢失。
如果spark自动提交,会在sparkstreaming刚运行时就立马提交offset,如果这个时候Spark streaming消费信息失败了,那么offset也就错误提交了。
所以要在sparkstreaming中实现exactly-once恰好一次,必须
1.手动提交偏移量
2.处理完业务数据后再提交offset
手动维护偏移量 需设置kafka参数enable.auto.commit改为false
手动维护提交offset有两种选择:
1.处理完业务数据后手动提交到Kafka
2.处理完业务数据后手动提交到本地库 如MySql、HBase
也可以将offset提交到zookeeper,但是经过我们测试,发现zookeeper不适合存储大量数据,在大数据量的情况下很容易崩溃。
我们来看下如何将offset存储到mysql中:
代码语言:javascript复制/ 处理完 业务逻辑后,手动提交offset偏移量到本地Mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
HBase中也是类似的
代码语言:javascript复制inputDStream.foreachRDD((rdd, batchTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
val newRDD = rdd.map(message => processMessage(message))
newRDD.count()
//save the offsets to HBase 批量处理把数据存储到Hbase当中
saveOffsets(topic, consumerGroupID, offsetRanges, hbaseTableName, batchTime)
})
ssc
}
/**
* 对数据进行处理
* @param message
* @return
*/
def processMessage(message: ConsumerRecord[String, String]): ConsumerRecord[String, String] = {
message
}
/*
Save Offsets into HBase
*/
def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange], hbaseTableName: String,
batchTime: org.apache.spark.streaming.Time) = {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME ":" GROUP_ID ":" String.valueOf(batchTime.milliseconds)
val put = new Put(rowKey.getBytes)
for (offset <- offsetRanges) {
put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
Bytes.toBytes(offset.untilOffset.toString))
}
table.put(put)
conn.close()
}
/*
Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
- CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from
Zookeeper and for each partition returns the last committed offset as 0
- CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last
committed offsets for each topic-partition is returned as is from HBase.
- CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last
committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,
function returns last committed offsets as 0
*/
def getLastCommittedOffsets(TOPIC_NAME: String, GROUP_ID: String, hbaseTableName: String, zkQuorum: String,
zkRootDir: String, sessionTimeout: Int, connectionTimeOut: Int): Map[TopicPartition, Long] = {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val zkUrl = zkQuorum "/" zkRootDir
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeOut)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
//Connect to HBase to retrieve last committed offsets
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val startRow = TOPIC_NAME ":" GROUP_ID ":" String.valueOf(System.currentTimeMillis())
val stopRow = TOPIC_NAME ":" GROUP_ID ":" 0
val scan = new Scan()
val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
val result = scanner.next()
var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
if (result != null) {
//If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
hbaseNumberOfPartitionsForTopic = result.listCells().size()
}
val fromOffsets = collection.mutable.Map[TopicPartition, Long]()
if (hbaseNumberOfPartitionsForTopic == 0) {
// initialize fromOffsets to beginning
for (partition <- 0 to zKNumberOfPartitionsForTopic - 1) {
fromOffsets = (new TopicPartition(TOPIC_NAME, partition) -> 0)
}
} else if (zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic) {
// handle scenario where new partitions have been added to existing kafka topic
for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
fromOffsets = (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
}
for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic - 1) {
fromOffsets = (new TopicPartition(TOPIC_NAME, partition) -> 0)
}
} else {
//initialize fromOffsets from last run
for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
fromOffsets = (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
}
}
scanner.close()
conn.close()
fromOffsets.toMap
}
}
第一个问题 numRecords must not be negative
当*删除已经使用过的kafka topic,然后新建同名topic*,用spark streaming Kakfa createDirectStream方法时出现了"numRecords must not be negative"
异常,这个异常是不合法的参数异常,RDD的记录数目必须不能是负数。
异常分析
首先我们看异常打印出现问题的位置
org.apache.spark.streaming.scheduler.StreamInputInfo.InputInfoTracker
的第38行
/**
* :: DeveloperApi ::
* Track the information of input stream at specified batch time.
*
* @param inputStreamId the input stream id
* @param numRecords the number of records in a batch
* @param metadata metadata for this batch. It should contain at least one standard field named
* "Description" which maps to the content that will be shown in the UI.
*/
@DeveloperApi
case class StreamInputInfo(
inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
require(numRecords >= 0, "numRecords must not be negative")
def metadataDescription: Option[String] =
metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
}
代码38行,判断了numRecords是否大于等于0,当不满足条件时抛出异常,可判断此时numRecords<0。 numRecords的解释: numRecords: the number of records in a batch
应该是当前rdd中records 数目计算出了问题。
offsetRanges的计算逻辑
offsetRanges的定义
代码语言:javascript复制offsetRanges: offset ranges that define the Kafka data belonging to this RDD
在KafkaRDDPartition 40行找到kafka partition offsetRange的计算逻辑:
代码语言:javascript复制def count(): Long = untilOffset - fromOffset`
`fromOffset: per-topic/partition Kafka offset defining the (inclusive) starting point of the batch`
`untilOffset: per-topic/partition Kafka offset defining the (inclusive) ending point of the batch
fromOffset来自zk中保存; untilOffset通过DirectKafkaInputDStream第145行:
代码语言:javascript复制val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
计算得到,计算过程得到最新的offset,然后使用spark.streaming.kafka.maxRatePerPartition
做clamp,得到允许的最大untilOffsets,##而此时新建的topic,如果topic中没有数据,untilOffsets应该为0##
原因总结
当删除一个topic时,zk中的offset信息并没有被清除,因此KafkaDirectStreaming再次启动时仍会得到旧的topic offset为old_offset,作为fromOffset。 当新建了topic后,使用untiloffset计算逻辑,得到untilOffset为0(如果topic已有数据则>0); 再次被启动的KafkaDirectStreaming Job通过异常的计算逻辑得到的rdd numRecords值为可计算为: numRecords = untilOffset - fromOffset(old_offset) 当untilOffset < old_offset时,此异常会出现,对于新建的topic这种情况的可能性很大
解决方法
根据以上分析,可在确定KafkaDirectStreaming 的fromOffsets时判断fromOffset与untiloffset的大小关系,当untilOffset < fromOffset时,矫正fromOffset为offset初始值0。
•从zk获取topic/partition 的fromOffset•利用SimpleConsumer获取每个partiton的lastOffset(untilOffset )•判断每个partition lastOffset与fromOffset的关系•当lastOffset < fromOffset时,将fromOffset赋值为0 通过以上步骤完成fromOffset的值矫正。
矫正offset的核心代码如下:
代码语言:javascript复制 /** 以下 矫正 offset */
// 得到Topic/partition 的lastOffsets
Map<TopicAndPartition, Long> topicAndPartitionLongMap =
KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
topicList, "my.group.id");
// 遍历每个Topic.partition
for (Map.Entry<TopicAndPartition, Long> topicAndPartitionLongEntry : fromOffsets.entrySet()) {
// fromOffset > lastOffset时
if (topicAndPartitionLongEntry.getValue() >
topicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey())) {
//矫正fromoffset为offset初始值0
topicAndPartitionLongEntry.setValue(0L);
}
}
/** 以上 矫正 offset */
第二个问题 Offsets out of range
Kafka DirectStream 读取topic中数据做测试,停止了一段时间,再次启动时出现了kafka.common.OffsetOutOfRangeException
异常如下:
代码语言:javascript复制0/12/16 11:08:33 WARN TaskSetManager: Lost task 2.0 in stage 105.0 (TID 85, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {register_topic-5=23550}
异常分析
offset越界分为头越界和尾越界,头部越界是zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);尾越界是zookeeper中保存的offset在topic中最新message的offset之后(zk_offset > last_offset),我们在前面遇到了这个问题,并做了处理,因此这个问题应该是头部越界导致。
经过分析,我们有一段时间没有消费topic中的数据了,大概已经过了七天,而kafka broker中我们设置的log保存时间为七天
因此,应该是kafka 中未被消费的数据被broker清除了,使得从zookeeper中读取到的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。
解决方法
首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.ms的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现zk_offset时矫正zk_offset为合法值
矫正offset的核心的代码如下:
代码语言:javascript复制/** 以下 矫正 offset */
// lastest offsets
Map<TopicAndPartition, Long> lastestTopicAndPartitionLongMap =
KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));
// earliest offsets
Map<TopicAndPartition, Long> earliestTopicAndPartitionLongMap =
KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"),
Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));
for (Map.Entry<TopicAndPartition, Long> topicAndPartitionLongEntry : fromOffsets.entrySet()) {
long zkOffset = topicAndPartitionLongEntry.getValue();
long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
// zkoffset 不在可用message offset区间内
if (zkOffset > lastestOffset || zkOffset < earliestOffset) {
// set offset = earliestOffset
logger.warn("矫正offset: " zkOffset " -> " earliestOffset);
topicAndPartitionLongEntry.setValue(earliestOffset);
}
}