SparkConf 配置与传播

2022-03-23 14:19:38 浏览数 (1)

在spark分布式程序中,sparkConf 主要起着Spark程序进行资源配置,性能调优,功能开关,参数传递的能力。在Spark的Driver和Executor中都存在着SparkConf。

在大多数的时候,我们可以通过new SparkConf() 来创建Spark配置。传统的Spark的入口是SparkContext的创建就是可以通过创建SparkConf, 并传入SparkContext()。

代码语言:javascript复制
val conf = new SparkConf().setAppName("app").setMaster("master")
val sparkContext = new SparkContext(conf)

首先,我们总结下Spark的配置通过以下3种方式获取:

  1. 来源于系统参数 (System.getProperties)中以 spark. 作为前缀的属性;
  2. 使用SparkConf 的Api进行设置;
  3. 从其他的SparkConf 中进行克隆继承。
代码语言:javascript复制
private val settings = new ConcurrentHashMap[String, String]()

从源码中可以看出,Spark中的所有配置,其key,value都是String类型,同时其都是存储在ConcurrentHashMap中的。

1. 来源于系统参数

从源码中可以看出,默认是会加载来自于系统的参数(JVM参数和系统虚拟机的参数),这些参数需要是spark. 开头的系统参数,会默认加载到sparkConf中。

代码语言:javascript复制
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

  import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

  private val settings= new ConcurrentHashMap[String, String]()

  ...
  // 如果loadDefaults为false则不会加载系统参数,可以方便测试。
  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    for ((key, value) <- Utils.getSystemPropertiesif key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }
...

2. 使用SparkConf API进行设置

通过set api 进行设置参数, 从源码可以看出, 默认对废弃的参数进行打印警告。默认会将配置放置到ConcurrentHashMap中。

代码语言:javascript复制
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
  set(key, value, false)
}

private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
  if (key == null) {
    throw new NullPointerException("null key")
  }
  if (value == null) {
    throw new NullPointerException("null value for "   key)
  }
  // 对废弃的参数进行打印警告
  if (!silent) {
    logDeprecationWarning(key)
  }
    settings.put(key, value)
  this
}

另外,在创建SparkSession时,必须指定的appName和master。

代码语言:javascript复制
val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark schedule mode")
      .getOrCreate()

从源码可以看出,其实就是设置配置的spark.master[spark.app.name](http://spark.app.name) 的键值对。

代码语言:javascript复制
*/
def setMaster(master: String): SparkConf = {
  set("spark.master", master)
}

/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
  set("spark.app.name", name)
}

另外,如果在设置spark配置以spark.hadoop开头,则会将后面的参数拷贝到hadoop的配置中。

代码语言:javascript复制
private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
  // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
  for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
    hadoopConf.set(key.substring("spark.hadoop.".length), value)
  }
}

给SparkExecutor传参可以通过调用setExecutorEnv方法:

代码语言:javascript复制
def setExecutorEnv(variable: String, value: String): SparkConf = {
  set("spark.executorEnv."   variable, value)
}

3. 从其他系统克隆继承

代码语言:javascript复制
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

  import SparkConf._
...

/** Copy this object */
  override def clone: SparkConf = {
    val cloned = new SparkConf(false)
    settings.entrySet().asScala.foreach { e =>
      cloned.set(e.getKey(), e.getValue(), true)
    }
    cloned
  }

从SparkConf的定义可以看出,其定义之初就实现了cloneable, 可以方便配置的拷贝与传递

代码语言:javascript复制
/**
 * Return a copy of this SparkContext's configuration. The configuration''cannot''be
 * changed at runtime.
 */
def getConf: SparkConf = conf.clone()

可以看出在SparkContext中获取SparkConf的方式就是采用的conf.clone() 的方式。

总结:SparkConf 是定制和调配Spark程序的灵魂,在spark中它被多个组件共用。在SparkConf中的键值对配置都被保存在ConcurrentHashMap中,它可以保证在高并发下保证一定的性能。SparkConf继承了Cloneable特质的clone方式,并通过其进行复用和传播。

0 人点赞