前言
在Spark框架中,应用程序的提交离不开Spark Driver,而Spark Driver的初始化始终围绕SparkContext的初始化,可以说SparkContext是Spark程序的发动机引擎,有了它程序才能跑起来,在spark-core中,SparkContext重中之重,它提供了很多能力,比如生成RDD,比如生成广播变量等,所以学习SparkContext的组件和启动流程有助于剖析整个Spark内核的架构。
SparkContext组件概览
在SparkContext中包含了整个框架中很重要的几部分:
- SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
- SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
- DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
- TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
- SparkStatusTracker:提供对作业、Stage的监控
- ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
- HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
- ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
- LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
- ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
- AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
- EventLoggingListener(可选):将事件持久化到存储的监听器,通过
spark.eventLog.enabled
进行控制 - ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性
spark.dynamicAllocation.enabled
和spark.dynamicAllocation.testing
进行控制
SparkContext初始化流程
在探究SparkContext初始化流程之前,先看一下这个类里有哪些属性,有助于我们去理解它在初始化的过程中做了哪些工作:
代码语言:javascript复制/*spark conf对象*/
private var _conf: SparkConf = _
/*保存event log日志的目录*/
private var _eventLogDir: Option[URI] = None
/*event log日志的压缩格式*/
private var _eventLogCodec: Option[String] = None
/*spark context 事件总线*/
private var _listenerBus: LiveListenerBus = _
/*spark env 运行环境*/
private var _env: SparkEnv = _
/*spark status tracker 作业状态监控器*/
private var _statusTracker: SparkStatusTracker = _
/*console progress bar 终端输出作业状态进度器*/
private var _progressBar: Option[ConsoleProgressBar] = None
/*spark ui*/
private var _ui: Option[SparkUI] = None
/*hadoop 配置文件*/
private var _hadoopConfiguration: Configuration = _
/*executor 内存大小*/
private var _executorMemory: Int = _
/*向executor提交task的管理控制器*/
private var _schedulerBackend: SchedulerBackend = _
/*task scheduler*/
private var _taskScheduler: TaskScheduler = _
/*heartbeat receiver*/
private var _heartbeatReceiver: RpcEndpointRef = _
/*dag scheduler*/
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
/*event logging listener*/
private var _eventLogger: Option[EventLoggingListener] = None
/*executor allocation manager*/
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
/*context cleaner*/
private var _cleaner: Option[ContextCleaner] = None
/*事件总线启动标识*/
private var _listenerBusStarted: Boolean = false
/*作业提交额外的jars*/
private var _jars: Seq[String] = _
/*作业提交额外的files*/
private var _files: Seq[String] = _
/*shutdown hook manager*/
private var _shutdownHookRef: AnyRef = _
/*app status store*/
private var _statusStore: AppStatusStore = _
实际上SparkContext初始化的过程大抵上就以上各种组件的初始化过程,接下来看详细启动流程:
- 使用构造方法中config的clone方法给自己的私有_conf进行赋值,同时校验SparkConf中的必要参数是否正确
class SparkContext(config: SparkConf) extends Logging {}
_conf = config.clone()
_conf.validateSettings()
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. "
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:n" _conf.toDebugString)
}
// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
- 根据配置项初始化EventLog的保存路径和压缩格式
_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}
- 初始化事件总线
_listenerBus = new LiveListenerBus(_conf)
- 初始化AppStatusStore
// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)
- 初始化SparkEnv
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
- 初始化SparkStatusTracker
_statusTracker = new SparkStatusTracker(this, _statusStore)
- 初始化ConsoleProgressBar
_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
- 初始化SparkUI
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
- 初始化hadoopConfiguration
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
- 初始化executorMemory
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
- 初始化heatbeatReveiver
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
- 初始化任务调度器并启动
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
- 初始化applicationId
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
- 初始化SparkEnv中的BlockManager
_env.blockManager.initialize(_applicationId)
- 初始化SparkEnv中的metricsSystem
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
- 初始化EventLoggingListener
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
- 初始化ExecutorAllocationManager
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
- 初始化ContextCleaner
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
- 初始化ShutdownHookManager
logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
try {
stop()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}