toc
命令样例
代码语言:txt复制./bin/spark-submit
--class <main-class>
--master <master-url>
--deploy-mode <deploy-mode>
--conf <key>=<value>
... # other options
<application-jar>
[application-arguments]
Shell过程
bin/spark-submit只是简单调用spark-class脚本:
代码语言:txt复制exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-class主要逻辑是组装并执行java命令,主要逻辑:
代码语言:txt复制#生成命令的主要方法
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d " $?
}
#运行java进程并接收标准输出,作为启动命令
CMD=()
while IFS= read -d '' -r ARG; do
CMD =("$ARG")
done < <(build_command "$@")
#启动上面java进程标准输出组成的命令
exec "${CMD[@]}"
如果展开spark-submit和spark-class,则相当于是分两步执行:
java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit <spark-submit脚本的args>
- 第一步组装一个java命令(main class是SparkSubmit),然后给到标准输出,并在shell中执行
java进程的执行逻辑
org.apache.spark.launcher.Main
这个进程的唯一逻辑就是用java代码的方式生成并输出一个java命令。
- 根据第一个参数,也就是spark-class要启动的class,决定怎么组装一个java启动命令,支持的class包括SparkSubmit、Master、Worker、HistoryServer、CoarseGrainedExecutorBackend、ExternalShuffleService等等。对于spark-submit,要启动的class是SparkSubmit。
- 生成sparksubmit的java命令: 1) 加载配置文件(通过--properties-file指定,否则spark-defaults.conf), 2) 通过i步配置确定是否clientMode,若是则-Xmx变为spark.driver.memory;添加native library spark.driver.extraLibraryPath到classpath 3) 添加SPARK_SUBMIT_OPTS 4) 添加从spark-submit脚本输入参数中解析出来的参数和mainclass org.apache.spark.deploy.SparkSubmit。 5) 至此构成一个完整的java命令,main class为SparkSubmit
org.apache.spark.deploy.SparkSubmit
以 spark on yarn 为例
主要逻辑就是梳理参数,向yarn提交作业
代码语言:txt复制org.apache.spark.deploy.SparkSubmit#main
org.apache.spark.deploy.SparkSubmit#doSubmit
org.apache.spark.deploy.SparkSubmit#parseArguments
org.apache.spark.deploy.SparkSubmit#submit
org.apache.spark.deploy.SparkSubmit#runMain
#主要生成提交作业的客户端进程所需的环境,包括:进程参数、classpath,系统属性,mainclass
org.apache.spark.deploy.SparkSubmit#prepareSubmitEnvironment
#YarnClusterApplication即为上一步生成的mainclass,用来提交作业
org.apache.spark.deploy.yarn.YarnClusterApplication#start
org.apache.spark.deploy.yarn.Client#run
org.apache.spark.deploy.yarn.Client#submitApplication
#向RM发起createApplication调用
org.apache.hadoop.yarn.client.api.YarnClient#createApplication
#根据上面createApplication的response看看resourcemanager是否有足够的资源来启动AM,否则直接failfast
org.apache.spark.deploy.yarn.Client#verifyClusterResources
#设置ApplicationMaster进程的所有上下文,包括:依赖的资源文件丢到分布式缓存,环境变量、javaOpts。根据deploymode,确定AM的main class:org.apache.spark.deploy.yarn.ApplicationMaster或org.apache.spark.deploy.yarn.ExecutorLauncher
org.apache.spark.deploy.yarn.Client#createContainerLaunchContext
#主要设置appName,队列spark.yarn.queue,appTags,重试次数spark.yarn.maxAppAttempts,AM的资源请求量,日志聚合
org.apache.spark.deploy.yarn.Client#createApplicationSubmissionContext
#rpc调用,向yarn RM发起作业提交请求
org.apache.hadoop.yarn.client.api.YarnClient#submitApplication
Spark-on-K8S 作业提交流程
前面提到,spark向yarn提交作业的client类是org.apache.spark.deploy.yarn.YarnClusterApplication
向k8s提交作业的client类是org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
。下面主要分析下这个类提交作业流程。
向k8s提交作业,主要就是生成DriverPod的YAML内容,然后周期性监听并记录driverPod的日志。
spark把DriverPod的yaml内容,从spark应用的角度拆分成几个部分,每一个部分用一个FeatureStep来实现。首先初始化一个空的pod和container,然后让所有FeatureStep逐一更新pod和container,形成完整的yaml。
常见的FeatureStep:
代码语言:txt复制#pod和container的基础信息填充
BasicDriverFeatureStep
#配置容器的参数如--class
DriverCommandFeatureStep
#给container加一写secrets的环境变量
EnvSecretsFeatureStep
#hadoop配置设置,包括container环境变量和configmap两种形式
HadoopConfDriverFeatureStep
#pv/pvc的挂载和使用
MountVolumesFeatureStep