导读
代码语言:javascript复制:: DeveloperApi ::
Holds all the runtime environment objects for a running Spark instance (either master or worker),
保存一个运行中的Spark实例的所有运行时环境对象(master或worker),
including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
包括序列化器、RpcEnv、块管理器、映射输出跟踪器等
Spark code finds the SparkEnv through a global variable, so all the threads can access the same
SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
目前Spark代码通过一个全局变量查找SparkEnv,这样所有线程都可以访问它
SparkEnv。它可以被SparkEnv访问。get(例如在创建SparkContext之后)。
NOTE: This is not intended for external use. This is exposed for Shark and may be made private in a future release.
SparkEnv 创建入口
在「SparkContext.scala」 中创建,老版本参数中还有actorsystem
// Create the Spark execution environment (cache, map output tracker, etc)
// 创建SparkEev 执行环境(cache, map输出追踪器, 等等)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
SparkEnv 架构组件
组件英文名 | 组件中文含义 |
---|---|
rpcEnv | Spark 通讯组件环境 |
serializer | 序列化器 |
closureSerializer | 闭包序列化器 |
serializerManager | 给各种 Spark 组件提供序列化、压缩及加密的服务 |
mapOutputTracker | Master/Slave 架构,管理 mapTask 输出状态 |
shuffleManager | 管理整个 shuffle 过程包括执行、计算 |
broadcastManager | 广播管理器 |
blockManager | Spark 运行时任务的数据读写管理 |
securityManager | 安全管理器,用来验证权限 |
metricsSystem | 指标监控系统 |
memoryManager | 内存管理器,整个 Spark 运行时的执行内存管理 |
outputCommitCoordinator | 决定任务是否可以向 HDFS 提交输出的权限。使用“第一个提交者获胜”政策。 |
前置
判断是否是 Driver
代码语言:javascript复制 //判断其是不是driver
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
// Listener bus is only used on the driver
// 监听总线只能在Driver用
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
}
创建SecurityManager安全管理器
Spark class responsible for security.
Spark 负责安全的类
代码语言:javascript复制 val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}
ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the "
"wire.")
}
}
创建RPCEnv 环境
A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
一个RpcEnv实现必须有一个[[RpcEnvFactory]]实现和一个空的构造函数
「Server :=> endpoint」
「Client :=> endpointRef」
代码语言:javascript复制 //判断是driverSystemName 还是executorSystemName
// private[spark] val driverSystemName = "sparkDriver"
// private[spark] val executorSystemName = "sparkExecutor"
val systemName = if (isDriver) driverSystemName else executorSystemName
//传入系统名称、通讯地址(socket)、rpc通讯地址、端口、配置、安全管理器、corenum 、是否是driver(因为会根据是否是driver来创建rpcendpoint)
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
//配置端口
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
}
// Create an instance of the class with the given name, possibly initializing it with our conf
// 使用给定名称创建该类SparkEnv的实例,使用conf进行初始化
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
// 寻找一个接受SparkConf和一个布尔型isDriver的构造函数,如果没有找到NoSuchMethodException
// 会用一个仅使用 SparkConf 参数的构造函数,如果还是没有找到NoSuchMethodException
// 然后会使用一个不使用任何参数的构造函数
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
}
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
// 创建一个由SparkConf属性或defaultClassName命名的类实例
,如果没有设置该属性,可以使用我们的conf初始化它
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}
//调用反射生成序列化
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
创建SerializerManager序列化管理器
Component which configures serialization, compression and encryption for various Spark components, including automatic selection of which [[Serializer]] to use for shuffles.
为各种Spark配置序列化、压缩和加密的组件,包括自动选择其中[[Serializer]]用于洗牌。
代码语言:javascript复制 //为各种Spark 组件配置序列化,压缩和加密的组件默认使用KryoSerializer
//def setDefaultClassLoader(classLoader: ClassLoader): Unit = //{
// kryoSerializer.setDefaultClassLoader(classLoader)
//}
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
// 闭包序列化,task任务很常见
val closureSerializer = new JavaSerializer(conf)
//判断是否是Driver来引用rpcendpoint
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
//如果是dirver端那么调用创建函数endpointCreator: => RpcEndpoint,并注册进RpcEnv返回RpcEndpointRef
logInfo("Registering " name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
//如果是Executor端那么直接创建一个Ref
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
创建BroadcastManager广播管理器
Spark2.x后使用Bit-torrent协议分发内容
「org.apache.spark.broadcast.TorrentBroadcast」
代码语言:javascript复制 val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
创建mapOutputTracker
Class that keeps track of the location of the map output of a stage. This is abstract because the driver and executor have different versions of the MapOutputTracker. In principle the driver and executor-side classes don't need to share a common base class; the current shared base class is maintained primarily for backwards-compatibility in order to avoid having to update existing test code.
代码语言:javascript复制❝MapOutputTracker在spark shuffle过程中的map和reduce起着衔接作用。具体点就是:在shuffle map过程中,executor端MapOutputTrackerWorker会将task结束后产生的map状态上报给Driver端的MapOutputTrackerMaster,所以在MapOutputTrackerMaster端保存中spark在shuffle map过程中所有block的相关的详细(包括位置,block大小等信息)。在shuffle reduce的时候,通过读取MapOutputTrackerMaster中的这些位置大小信息,从而决定去远程或者本地fetch相关block数据。shuffleId会保存到mapStatuses返回~ ❞
//根据是不是Driver来创建MapOutputTrackerMaster、MapOutputTrackerWorker
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
// 初始化后必须分配trackerEndpoint,因为MapOutputTrackerEndpoint需要MapOutputTracker本身
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
创建Shuffle管理器
Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles with it, and executors (or tasks running locally in the driver) can ask to read and write data.
SparkEnv基于spark.shuffle.manager设置在驱动程序和每个执行程序上创建一个ShuffleManager。驱动程序注册shuffle,执行者(或在驱动程序中本地运行的任务)可以请求读写数据。
代码语言:javascript复制 // Let the user specify short names for shuffle managers
//通过反射拿到shuffle管理器,目前已经没有HashShuffleManager了,只有SortShuffleManager
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
创建动态内存管理器
Spark 内存资源管理。参照[857思维导图]
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)//静态内存管理器
} else {
UnifiedMemoryManager(conf, numUsableCores)//动态内存管理器 :统一内存管理器
}
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
创建BlockTransferService块传输服务
主要是用Netty来传输数据块服务
代码语言:javascript复制 val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
创建BlockManagerMaster
负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令
代码语言:javascript复制 val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
创建BlockManager
Manager running on every node (driver and executors) which provides interfaces for putting and retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
在每个节点(驱动程序和执行程序)上运行的管理器,它提供了在本地和远程将块放入和检索到各种存储(内存、磁盘和堆外)的接口。
代码语言:javascript复制 val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
创建监控指标系统MetricsSystem
Spark Metrics System, created by a specific "instance", combined by source,sink, periodically polls source metrics data to sink destinations.
Spark Metrics System由一个特定的“实例”创建,由source和sink组合,定期轮询source Metrics数据到sink目的地。
代码语言:javascript复制 //如果是Driver 需要appId,executor 需要 executorId
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
创建OutputCommitCoordinator
输出协调器,决定任务是否可以向 HDFS 提交输出的权限。使用“第一个提交者获胜”政策。
代码语言:javascript复制 val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
包装SparkEnv组件
包装SparkEnv组件返回实例
代码语言:javascript复制 val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
总结
Spark Env 源码顺序大致就是上面的流程,更细致的后面的博文中会持续更新解读。
在深入学习之前,建议先了解RPC 调用流程。以及ListenerBus。