导读
代码语言:javascript复制/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
* 异步将SparkListenerEvents传递给已注册的SparkListeners。
*
* Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
* 在调用“ start()”之前,所有已发布的事件仅被缓冲。
* 仅在此侦听器总线启动之后,事件才会实际传播到所有连接的侦听器。
* 当调用`stop()`时,此侦听器总线停止,并且停止后它将丢弃其他事件。
*/
为什么要使用事件监听机制? 设想如果Spark事件通知采用Scala函数调用方式,随着集群规模的增加,会对函数调用的越来越多,最终会受到JVM线程数量的限制而影响监控数据的更新,甚至出现无法提供监控数据给用户。函数调用多数情况是同步调用,这样还会导致线程阻塞,并被长时间占用。 使用事件监听机制的好处是什么? 会将函数调用更换成事件发送或者事件投递,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,整个系统的并发将会大大的增加。发送的事件会进入缓存,由定时调度取出,分配给监听此事件的监听器对监控数据更新。
队列
异步事件队列
异步事件列队主要由LinkedBlockingQueue[SparkListenerEvent]
构建,默认大小为10000
事件监听线程会不断从LinkedBlockingQueue
中获取事件。任何事件都会在LinkedBlockingQueue
中存放一段时间,当线程处理完这个事件后,会将其清除。
// LiveListenerBus.scala
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
// AsyncEventQueue.scala
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) //默认值10000
监听器队列
代码语言:javascript复制 /** Add a listener to queue shared by all non-internal listeners. */
/**
* 主要由SparkContext调用,即用户可以在代码中增加Listener,
* 或从配单中增加Listener并反射调用[实现在SparkContext中的setupAndStartListenerBus()]
* */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
/** Add a listener to the executor management queue. */
/**
* 分别可增加HeartbeatReceiver(用于监听Executor的Add和Remove,并使用线程定期判断各Executor的心跳时间,超时则Kill
* Executor),另外可通过ExecutorAllocationManager增加ExecutorAllocationListener
* (通过计算总task数和Excutor并行度进行匹配,动态增加、减少Executor,需要配置,默认关闭)
* */
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
/** Add a listener to the application status queue. */
/**
* 主要增加了AppStatusListener,为AppStatusStore提供Job、Stage、Task的UI展示数据,
* 以及增加了SQLAppStatusListener,为SQLAppStatesStore提供SQLUI展示数据
* */
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
/** Add a listener to the event log queue. */
/**将监听到的事件以Json方式写出到日志存储,需要配置,默认为关闭*/
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
/**
* Add a listener to a specific queue, creating a new queue if needed. Queues are independent
* of each other (each one uses a separate thread for delivering events), allowing slower
* listeners to be somewhat isolated from others.
* 前面几个方法内部均调用此方法
* 另外:spark structured streaming流式计算对应的StreamingQueryListenerBus通过addToQueue()方法加入"streams"队列
* (用于监听流的start、process、terminate时间,其中process事件能获取到流处理的详细进度,包括流名称、id、水印时间、
* source offsets、sink offsets等)
*/
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
事件投递
SparkListenerEvent事件类型
SparkListenerEvent
是一个特质,如下是一些子类,可以用于事件的展示、记录。
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
外部公用事件投递接口POST
外部事件投递接口,SparkContext、DAGScheduler 、CoarseGrainedSchedulerBackend
等都通过post,提交事件到总线。
投递过程:
- 总线启动,调用postToQueues()方法将事件投入到对应的命名队列中。
- 总线未启动,将事件保存到ListBuffer[SparkListenerEvent]队列中,等待总线启动时投递事件,清空缓存
事件投递过程代码如下
代码语言:javascript复制// 在SparkContext中会调用事件的start方法启动总线
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
// 标记总线为已启动
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("LiveListenerBus already started.")
}
this.sparkContext = sc
// 总线启动后,将queuedEvents缓存队列投递后清空
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
}
queuedEvents = null
metricsSystem.registerSource(metrics)
}
//在post方法中,会判断总线是否启动及投递
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
// If the event buffer is null, it means the bus has been started and we can avoid
// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
// 总线已经启动,缓存队列queuedEvents已置为null,则直接投递
if (queuedEvents == null) {
postToQueues(event)
return
}
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
// calling start() picks up the new event.
synchronized {
if (!started.get()) {
// 总线未启动,则将事件先放入缓存队列
queuedEvents = event
return
}
}
// If the bus was already started when the check above was made, just post directly to the queues.
// 投递事件
postToQueues(event)
}
DAGScheduler投递事件分析
更新监控指标
代码语言:javascript复制def executorHeartbeatReceived(
execId: String,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Task执行启动及获取Result
代码语言:javascript复制 private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
}
private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
}
.........................................
Stage的启动停止
代码语言:javascript复制 /** Called when stage's parents are available and we can now do its task.
* 在stages父类有空闲的时候,就可以去执行task
* */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" stage ")")
// First figure out the indexes of partition ids to compute.
//1. 当前Stage没有计算完的分区对应的索引
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
//2. 关联ActiveJob中的调度池,作业组,描述等
val properties = jobIdToActiveJob(jobId).properties
//3. 将当前stage加入runningStages集合
runningStages = stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
//4. 根据Stage类别,计算分区位置
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
Job启动停止
代码语言:javascript复制private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_], //
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //1.创建最终FinalStage(ResultStage)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots "
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.....
...
private[scheduler] def cleanUpAfterSchedulerStop() {
for (job <- activeJobs) {
val error =
new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
// Tell the listeners that all of the running stages have ended. Don't bother
// cancelling the stages because if the DAG scheduler is stopped, the entire application
// is in the process of getting stopped.
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
// The `toArray` here is necessary so that we don't iterate over `runningStages` while
// mutating it.
runningStages.toArray.foreach { stage =>
markStageAsFinished(stage, Some(stageFailedMessage))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
AsyncEventQueue 异步事件处理
AsyncEventQueue类图继承结构
AsyncEventQueue方法列表
AsyncEventQueue 功能点
- dispatchThread
AsyncEventQueue内部具有一个单一线程的dispatchThread,调用
dispatch()–>postToAll()–>doPostEvent()
方法持续处理eventQueue中事件,让所有注册的listener响应事件
AsyncEventQueue 父类doPostEvent方法实现
StreamingListenerBus及StreamingQueryListenerBus重写了doPostEvent(),只关注和处理流相关的事件。
从方法中看出,除了事件匹配还用了SparkListenerInterface
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
AsyncEventQueue事件处理流程
SparkListenerInterface分析
Streaming 后续会详细分析
AppStatusListener
Spark UI中Job、Stage、Task页面,调用AppStatusStore提供的方法,读取kvstore中存储的rdd任务相关信息。
代码语言:javascript复制**
* A Spark listener that writes application information to a data store. The types written to the
* store are defined in the `storeTypes.scala` file and are based on the public REST API.
* Spark监听器,将应用程序信息写入数据存储。写入的类型
* store定义在' storeTypes中。scala '文件,并且基于公共REST API。
* @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
SQLAppStatusListener
Spark UI中SQL页面,调用SQLAppStatusStore提供的方法,读取kvstore中存储的SparkPlan物理计划(SQL真实执行流程)相关信息。
代码语言:javascript复制class SQLAppStatusListener(
conf: SparkConf,
kvstore: ElementTrackingStore,
live: Boolean) extends SparkListener with Logging {
KVStore后续更新~~
晚安~~