背景
虽然当前实时计算领域所有厂商都推荐Flink框架,但是某些传统行业客户因为多年固化的业务场景仍然坚持使用SparkStreaming框架。本文主要记录Spark概念架构、SparkStreaming性能问题处理、SparkStreaming 7*24作业在Kerberos Hadoop集群HDFS_DELEGATION_TOKEN问题处理。
Spark概念架构
Spark applications以进程集合(Executors)的方式运行在集群上,通过main方法(也称Driver程序)中的SparkContext对象管理调度Executors。集群提供Executors运行所需的资源,集群类型分为Spark standalone、Mesos、YARN、Kubernetes。
架构关键点说明:
代码语言:javascript复制1.每个application都有自己的Executors进程,进程以多线程的方式运行task。不同application的Driver和Executors相互隔离,如果不通过外部系统,无法共享数据。
2.application生命周期内,Driver需要与Executors通信,比如:调度task到Executors执行、接收Executors心跳、接收Executors blocks信息等等。所以,Driver与Executors建议局域网内通信。
关键名词说明:
名称 | 说明 |
---|---|
Application | 用户开发的Spark程序,包括Driver端和Executors端 |
Driver | 运行main()方法、创建SparkContext等 |
Executor | 运行tasks、保存数据在内存或磁盘 |
Task | Driver发送给Executors的执行单元 |
Job | 多个Tasks组成的并行计算,由action算子生成 |
Stage | Job划分不同的Tasks集合为Stage,由shuffle算子生成 |
Spark是基于RDD进行计算的,而RDD是一种抽象的、分布式的、弹性的、能并行操作的数据集。两种创建RDD的方式:加载Driver程序内的数据集合或者加载外部数据源,如Kafka、HDFS、HBase、Hive、文件系统等等。
RDD支持两种operation类型:transformations、actions。transformations就是基于已有RDD创建新的RDD,比如map。而actions就是触发RDD的计算,将结果返回给Driver,每个action操作会生成一个Job,比如reduce。所有的transformations都是惰性的,并不会立即触发计算,只是记录相应的计算逻辑。action需要计算结果的时候才触发计算。这种设计使得Spark更加高效。
默认情况下,多次action需要对同一个RDD进行transformations操作时候,都会重新RDD的重复计算。建议使用persisit(或者cache)将RDDD持久化到内存或者磁盘,以提高多次使用的效率。
除了RDD以外,Spark中还有一个抽象是可用于并行操作的共享变量。Spark在多个Executors节点之间并行执行Tasks时候,一个变量需要在Tasks之间或者Driver与Tasks之间共享使用。Spark支持两种类型共享变量:广播变量、计数器。
SparkStreaming性能问题
数据源使用Kafka支持两种模式:KafkaReceiver、DirectKafka。从实现上来看,DirectKafka的性能更优,数据一致性更强。推荐使用DirectKafka的API实现接收器。性能调优策略其实很成熟、很有效,包括:批量Duration间隔、kafka消费速率、RDD持久化、Driver与Executor内存、并行度、外部shuffle服务等等。但是,客户疑问的现场如下:
如上图所以模拟客户线上作业的现象:为什么Output Op Duration耗时(42秒)比Job Duration耗时(4秒 3秒)很长?Output Op Duration耗时长就导致批Duration任务出现排队的现象。
那么,要解释上述现象,就要回到前面章节提到的Application名词解释,即:用户开发的Spark程序,包括Driver端和Executors端。换句话说,app程序是分为两部分的,一部分在Driver端执行、另一部分在Executor端执行。对应到UI上也就是:Oupt Op Duration是Driver端执行的耗时、Job Duration是Executor端执行的耗时。相关原理可以查看Spark源码:
代码语言:javascript复制org.apache.spark.streaming.scheduler. JobScheduler
而Driver端耗时较长通常原因是:程序使用了往Driver端拉取数据的算子或者shuffler数据量大或者SparkSession存在耗时较长的sql操作等等。比如:
代码语言:javascript复制df.collect();
sparkSession.sql("insert into a select * from b");
完整示例详见github:
代码语言:javascript复制https://github.com/felixzh2020/felixzh-java/blob/main/SparkStreaming/src/main/java/com/felixzh/Kafka2Hdfs.java#L60
HDFS_DELEGATION_TOKEN问题
我们知道SparkStreaming作业属于7*24长时间运行的流作业,客户反馈说任务每7天就报错退出,异常日志提示:HDFS_DELEGATION_TOKEN is expired。
这里先简单说明下原理就是开启Kerberos认证的Hadoop集群中HDFS的namenode会生成HDFS_DELEGATION_TOKEN,同时给token设置相关生命周期管理参数。
需要访问HDFS的应用需要申请token,然后使用token才能正常操作HDFS。而token是有生命周期的,也就是说会过期。当然,这个过期是正常行为。
那么,对于流任务怎么办?先否定一种方式就是将过期时间调大,这个不现实。回到正路上来,既然token过期,那只需要在token过期之前、重新申请token不就行了。事实正是如此,常用的流计算框架Flink、SparkStreaming都是这么做的。
SparkStreaming通过HadoopDelegationTokenManager类实现周期性地登录KDC、周期性地申请delegation token。也就是在delegation token过期前,Driver会重新申请新token,然后通过IPC发送给Execuors,从而确保SparkStreaming能够长时间运行。
具体方式有两种:一种是使用keytab;一种是使用ticket cache。
很显然,推荐使用Keytab方式,就是使用spark-submit --principal --keytab 即可。当然,使用--conf spark.kerberos.keytab --conf spark.kerberos.principal效果相同。