Spark 源码(7) - Driver 启动之 SparkContext 初始化

2021-10-12 12:16:25 浏览数 (1)

一、SparkContext 初始化

上次阅读到 Master 通知 Worker 启动了一个 Driver,就是启动了一个 JVM,并且开始使用反射的方式执行 DriverWrapper 的 main 方法。

在 main 方法中,会执行我们提交的 jar 包中的 main 方法,也就是这个 JavaWordCount 程序的 main 方法:

然后开始创建 SparkContext

代码语言:javascript复制
SparkSession spark = SparkSession.builder().appName("JavaWordCount").getOrCreate();

点进去,可以看到:

代码语言:javascript复制
SparkContext.getOrCreate(sparkConf)

这里 new 了一个 SparkContext(),然后开始看 SparkContext 的构造方法,这个方法尤其的长,但我们得抓住重点。

重点就在 500 多行的这里,创建了 SchedulerBackend,TaskScheduler,DAGScheduler

下面简单的介绍下这三个核心组件的作用:

(1)TaskScheduler

TaskScheduler 是一个 trait,它的核心任务是提交 TaskSet 到集群并汇报结果,并且向 DAGScheduler 汇报任务的执行情况。

主要实现类是 TaskSchedulerImpl

(2)SchedulerBackend

SchedulerBackend 也是一个 trait,不同的部署模式有不同的实现,比如在 standalone 模式下,实现类是:StandaloneSchedulerBackend。

它内部有两个 Endpoint 组件,一个是 DriverEndpoint ,用来和 Worker 打交道,比如创建 Executor、提交 Task 等;一个是 ClientEndpoint 用来和 master 打交道。

SchedulerBackend 专门负责收集 Worker 上的资源信息,它知道每个任务在提交时,自己拥有多少资源,然后去具体运行 Task。

(3)DAGScheduler

DAGScheduler 的主要作用是:当执行 action 算子的时候,DAGScheduler 会从最后一个 rdd 开始往前递归寻找 ShuffleDependency,每找到一个就会划分一个 stage,最终达到的目的是,根据业务逻辑算子依赖关系,划分成一个个 stage,并且根据 stage 先后关系,把 stage 转化成 ShuffleMapTask 或者 ResultTask,封装成 TaskSet,交给 TaskScheduler 提交。

最后要注意的是,DAGScheduler 和 SchedulerBackend 都是 TaskSchedulerImpl 的成员变量。

二、TaskScheduler 的创建

下面回到创建 TaskScheduler 的地方:

代码语言:javascript复制
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

这个方法中,主要是根据不同的部署模式,分别创建不同的 TaskScheduler 和 SchedulerBackend。

在 Standalone 模式下:

创建了 TaskSchedulerImpl 对象和 StandaloneSchedulerBackend 对象。

在 TaskSchedulerImpl 的构造方法中,看到了两个重要的属性,dagScheduler 和 backend,其他倒没有什么逻辑了。

然后看到 StandaloneSchedulerBackend 的构造方法

这里要注意一下继承关系,继承了 CoarseGrainedSchedulerBackend

然后父类里有一个很重要的属性,在执行构造的方法的时候,就会执行这个

也就是说,在这句代码执行的时候:

代码语言:javascript复制
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

其实,就创建了一个 DriverEndpoint 组件了,那么紧接着就需要看一下 DriverEndpoint 类的 onStart() 方法:

每间隔一段时间,就给自己发送了一个 ReviveOffers 消息

这个消息的处理逻辑就很长了,需要单独一次来阅读了。可以一句话总结它干的事情:过滤出所有的资源,找到所有需要启动的 TaskSet,分配资源,启动任务。

然后再回来看 StandaloneSchedulerBackend 的构造方法,构造方法里没有逻辑,接着回到 SparkContext 往下看。

接下来,scheduler 调用了 initialize 方法:

代码语言:javascript复制
scheduler.initialize(backend)

这个方法就把当前的 SchedulerBackend 绑定到自己的成员属性上:

接着 new 了 DAGScheduler

代码语言:javascript复制
_dagScheduler = new DAGScheduler(this)

然后启动 TaskScheduler()

代码语言:javascript复制
_taskScheduler.start()

三、TaskScheduler 的启动

首先看到实现类 TaskSchedulerImpl 类的 start() 方法

调动了内部的 SchedulerBackend 的启动方法:

这个方法中,首先封装了一个 CoarseGrainedExecutorBackend 类的启动命令

把这个 Command 封装到 ApplicationDescription,应用程序的描述信息中

然后把 ApplicationDescription 作为成员变量,并且还创建了一个 StandaloneAppClient 对象

然后给启动了起来,new 了一个 ClientEndpoint() ,注意这是一个 Endpoint

看一下它的 onStart() 方法,向 Master 注册

最终是给 Master 发送了一个 RegisterApplication 消息,消息里有 ApplicationDescription(应用程序的描述信息)

四、总结

这一切都做完之后,SparkContext() 最核心的地方就阅读完了,总结一下就是创建了 DAGScheduler、TaskScheduler、SchedulerBackend 三大组件。并且 SchedulerBackend 内部有两个 Endpoint,DriverEndpoint 和 ClientEndpoint,分别从来和 Worker 通信 、和 Master 通信。

下次我们来阅读 Master 处理 Driver 的注册应用程序的消息。

下面是目前的流程图:

0 人点赞