在spark分布式程序中,sparkConf 主要起着Spark程序进行资源配置,性能调优,功能开关,参数传递的能力。在Spark的Driver和Executor中都存在着SparkConf。
在大多数的时候,我们可以通过new SparkConf()
来创建Spark配置。传统的Spark的入口是SparkContext的创建就是可以通过创建SparkConf, 并传入SparkContext()。
val conf = new SparkConf().setAppName("app").setMaster("master")
val sparkContext = new SparkContext(conf)
首先,我们总结下Spark的配置通过以下3种方式获取:
- 来源于系统参数 (System.getProperties)中以 spark. 作为前缀的属性;
- 使用SparkConf 的Api进行设置;
- 从其他的SparkConf 中进行克隆继承。
private val settings = new ConcurrentHashMap[String, String]()
从源码中可以看出,Spark中的所有配置,其key,value都是String类型,同时其都是存储在ConcurrentHashMap中的。
1. 来源于系统参数
从源码中可以看出,默认是会加载来自于系统的参数(JVM参数和系统虚拟机的参数),这些参数需要是spark. 开头的系统参数,会默认加载到sparkConf中。
代码语言:javascript复制class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
private val settings= new ConcurrentHashMap[String, String]()
...
// 如果loadDefaults为false则不会加载系统参数,可以方便测试。
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemPropertiesif key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
...
2. 使用SparkConf API进行设置
通过set api 进行设置参数, 从源码可以看出, 默认对废弃的参数进行打印警告。默认会将配置放置到ConcurrentHashMap中。
代码语言:javascript复制/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " key)
}
// 对废弃的参数进行打印警告
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
另外,在创建SparkSession时,必须指定的appName和master。
代码语言:javascript复制val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark schedule mode")
.getOrCreate()
从源码可以看出,其实就是设置配置的spark.master
和 [spark.app.name](http://spark.app.name)
的键值对。
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
另外,如果在设置spark配置以spark.hadoop开头,则会将后面的参数拷贝到hadoop的配置中。
代码语言:javascript复制private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
给SparkExecutor传参可以通过调用setExecutorEnv方法:
代码语言:javascript复制def setExecutorEnv(variable: String, value: String): SparkConf = {
set("spark.executorEnv." variable, value)
}
3. 从其他系统克隆继承
代码语言:javascript复制class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
...
/** Copy this object */
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
从SparkConf的定义可以看出,其定义之初就实现了cloneable, 可以方便配置的拷贝与传递
代码语言:javascript复制/**
* Return a copy of this SparkContext's configuration. The configuration''cannot''be
* changed at runtime.
*/
def getConf: SparkConf = conf.clone()
可以看出在SparkContext中获取SparkConf的方式就是采用的conf.clone() 的方式。
总结:SparkConf 是定制和调配Spark程序的灵魂,在spark中它被多个组件共用。在SparkConf中的键值对配置都被保存在ConcurrentHashMap中,它可以保证在高并发下保证一定的性能。SparkConf继承了Cloneable特质的clone方式,并通过其进行复用和传播。