对于流式应用程序,保证应用7*24小时的稳定运行,是非常必要的。因此对于计算引擎,要求必须能够适应与应用程序逻辑本身无关的问题(比如driver应用失败重启、网络问题、服务器问题、JVM崩溃等),具有自动容错恢复的功能。
目前,Spark(Spark Streaming/Structured Streaming)和Flink的checkpoint机制,就是处理类似情况,实现容错机制的核心利器。
对于Flink:
为了保证其高可用、Exactly Once的特性,提供了一套强大的checkpoint机制,它能够根据配置周期性地基于流中各个operator的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦出现故障时,能够将整个应用流程序恢复到故障前的某一种态,从而修正因为故障带来的程序数据状态中断。
对于Spark:
在流式应用中,Spark Streaming/Structured Streaming会将关于应用足够多的信息checkpoint到高可用、高容错的分布式存储系统,如HDFS中,以便从故障中进行恢复。Spark checkpoint有两种类型的数据:
- 数据checkpoint
对于一些复杂程序,比如跨多个批次组合数据的有状态转换,生成的RDD依赖于先前批次的RDD,导致依赖链的长度随批次的增加而增加。因为故障恢复时间与依赖链成正比,从而导致恢复时间也跟着增长。因此就有必要周期性的将RDD checkpoint到可靠的分布式存储系统中,以此切断依赖链。
这在Spark中的状态算子,如mapWithState、updateStateByKey中尤为常见。
- 元数据checkpoint
顾名思义,就是将定义流式应用程序中的信息保存到容错系统中,用于从运行流应用程序的driver节点发生故障时,进行容错恢复。元数据包括:
- 配置:用于创建流应用程序DStream操作:
- 定义流应用程序的DStream操作集
- 未完成的批次:未完成的批次job
本文的重点不在于checkpoint具体含义,而是以Spark为例,阐述如何通过程序获取checkpoint中最新的offset,以此为思路,来解决生产中的实际问题。
通常我们会checkpoint到HDFS,首先来看一下checkpoint信息:
offsets目录记录了每个批次中的offset,此目录中的第N条记录表示当前正在处理,第N-1个及之前的记录指示哪些偏移已处理完成。
代码语言:javascript复制/bigdatalearnshare/checkpointLocation/binlog-2-kafka/commits
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/metadata
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/receivedData
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/sources
代码语言:javascript复制hdfs dfs -ls /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/0
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/1
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2
代码语言:javascript复制hdfs dfs -cat /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2
v1
{"batchWatermarkMs":0,"batchTimestampMs":1590632490083,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"1"}}
2400000001667289
最终获取最新offset的程序示例:
代码语言:javascript复制/**
* @Author 微信公众号:大数据学习与分享
*/
object ReadOffsets {
def main(args: Array[String]): Unit = {
val path = "/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2"
val fs = FileSystem.get(new Configuration())
val lastFile = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.endsWith(".tmp.crc"))
.map { fileName =>
(fileName.getPath.getName.split("/").last.toInt, fileName.getPath)
}.maxBy(_._1)._2
val offset = readFile(lastFile.toString).split("n").last
assert("2400000001667289".equals(offset))
}
def readFile(path: String): String = {
val fs = FileSystem.get(new Configuration())
var br: BufferedReader = null
var line: String = null
val result = ArrayBuffer.empty[String]
try {
br = new BufferedReader(new InputStreamReader(fs.open(new Path(path))))
line = br.readLine()
while (line != null) {
result = line
line = br.readLine()
}
} finally {
if (br != null) br.close()
}
result.mkString("n")
}
}
这一点在生产环境中还是有一定应用场景的,比如,通过解析mysql binlog日志,将数据同步到kafka,然后再通过消费者程序消费kafka中的数据保存到存储系统中,如delta,通过offset信息对比来校验,binlog到kafka的延迟(如,通过获取binlog中的offset信息与流程序同步到kafka时进行checkpoint的offset做对比)、kafka到存储系统中的延迟。
此外,要注意commits目录下记录的是已完成的批次信息。在实际进行offset比对时,要以此为基准再去获取offsets目录下的offsets信息。