浅析SparkContext中的组件与创建流程

2022-03-23 15:14:58 浏览数 (1)

前言

在Spark框架中,应用程序的提交离不开Spark Driver,而Spark Driver的初始化始终围绕SparkContext的初始化,可以说SparkContext是Spark程序的发动机引擎,有了它程序才能跑起来,在spark-core中,SparkContext重中之重,它提供了很多能力,比如生成RDD,比如生成广播变量等,所以学习SparkContext的组件和启动流程有助于剖析整个Spark内核的架构。

SparkContext组件概览

在SparkContext中包含了整个框架中很重要的几部分:

  • SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
  • SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
  • DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
  • TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
  • SparkStatusTracker:提供对作业、Stage的监控
  • ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
  • HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
  • ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
  • LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
  • ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
  • AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
  • EventLoggingListener(可选):将事件持久化到存储的监听器,通过spark.eventLog.enabled 进行控制
  • ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性spark.dynamicAllocation.enabledspark.dynamicAllocation.testing 进行控制

SparkContext初始化流程

在探究SparkContext初始化流程之前,先看一下这个类里有哪些属性,有助于我们去理解它在初始化的过程中做了哪些工作:

代码语言:javascript复制
/*spark conf对象*/
private var _conf: SparkConf = _
/*保存event log日志的目录*/
private var _eventLogDir: Option[URI] = None
/*event log日志的压缩格式*/
private var _eventLogCodec: Option[String] = None
/*spark context 事件总线*/
private var _listenerBus: LiveListenerBus = _
/*spark env 运行环境*/
private var _env: SparkEnv = _
/*spark status tracker 作业状态监控器*/
private var _statusTracker: SparkStatusTracker = _
/*console progress bar 终端输出作业状态进度器*/
private var _progressBar: Option[ConsoleProgressBar] = None
/*spark ui*/
private var _ui: Option[SparkUI] = None
/*hadoop 配置文件*/
private var _hadoopConfiguration: Configuration = _
/*executor 内存大小*/
private var _executorMemory: Int = _
/*向executor提交task的管理控制器*/
private var _schedulerBackend: SchedulerBackend = _
/*task scheduler*/
private var _taskScheduler: TaskScheduler = _
/*heartbeat receiver*/
private var _heartbeatReceiver: RpcEndpointRef = _
/*dag scheduler*/
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
/*event logging listener*/
private var _eventLogger: Option[EventLoggingListener] = None
/*executor allocation manager*/
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
/*context cleaner*/
private var _cleaner: Option[ContextCleaner] = None
/*事件总线启动标识*/
private var _listenerBusStarted: Boolean = false
/*作业提交额外的jars*/
private var _jars: Seq[String] = _
/*作业提交额外的files*/
private var _files: Seq[String] = _
/*shutdown hook manager*/
private var _shutdownHookRef: AnyRef = _
/*app status store*/
private var _statusStore: AppStatusStore = _

实际上SparkContext初始化的过程大抵上就以上各种组件的初始化过程,接下来看详细启动流程:

  1. 使用构造方法中config的clone方法给自己的私有_conf进行赋值,同时校验SparkConf中的必要参数是否正确
代码语言:javascript复制
class SparkContext(config: SparkConf) extends Logging {}

    _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. "  
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }

    if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:n"   _conf.toDebugString)
    }

    // Set Spark driver host and port system properties. This explicitly sets the configuration
    // instead of relying on the default value of the config constant.
    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
    _conf.setIfMissing("spark.driver.port", "0")

    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

    _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
  1. 根据配置项初始化EventLog的保存路径和压缩格式
代码语言:javascript复制
    _eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

    _eventLogCodec = {
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }
  1. 初始化事件总线
代码语言:javascript复制
_listenerBus = new LiveListenerBus(_conf)
  1. 初始化AppStatusStore
代码语言:javascript复制
    // Initialize the app status store and listener before SparkEnv is created so that it gets
    // all events.
    _statusStore = AppStatusStore.createLiveStore(conf)
    listenerBus.addToStatusQueue(_statusStore.listener.get)
  1. 初始化SparkEnv
代码语言:javascript复制
    // Create the Spark execution environment (cache, map output tracker, etc)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
  1. 初始化SparkStatusTracker
代码语言:javascript复制
_statusTracker = new SparkStatusTracker(this, _statusStore)
  1. 初始化ConsoleProgressBar
代码语言:javascript复制
    _progressBar =
      if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }
  1. 初始化SparkUI
代码语言:javascript复制
    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
          startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind())
  1. 初始化hadoopConfiguration
代码语言:javascript复制
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
  1. 初始化executorMemory
代码语言:javascript复制
    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)
  1. 初始化heatbeatReveiver
代码语言:javascript复制
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
  1. 初始化任务调度器并启动
代码语言:javascript复制
    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()
  1. 初始化applicationId
代码语言:javascript复制
    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/"   _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
  1. 初始化SparkEnv中的BlockManager
代码语言:javascript复制
    _env.blockManager.initialize(_applicationId)
  1. 初始化SparkEnv中的metricsSystem
代码语言:javascript复制
    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
  1. 初始化EventLoggingListener
代码语言:javascript复制
    _eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addToEventLogQueue(logger)
        Some(logger)
      } else {
        None
      }
  1. 初始化ExecutorAllocationManager
代码语言:javascript复制
    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
              _env.blockManager.master))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())
  1. 初始化ContextCleaner
代码语言:javascript复制
    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())
  1. 初始化ShutdownHookManager
代码语言:javascript复制
    logDebug("Adding shutdown hook") // force eager creation of logger
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      try {
        stop()
      } catch {
        case e: Throwable =>
          logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
      }
    }

0 人点赞