Spark 源码(2) - Spark Rpc 三剑客的理解

2021-10-12 12:49:12 浏览数 (1)

一、Spark Rpc 三剑客

谈到 Spark Rpc ,不得不提到 Spark Rpc 的三剑客:RpcEnv,RpcEndpoint,RpcEndpointRef。

如何理解他们呢,请看下图:

精简一下发送消息的过程为:通过 RpcEndpointRef 来发送一个消息给 RpcEnv,然后经过 Dispatcher 和 Inbox 的共同处理,发送给 RpcEndpoint 来处理(当前中间的过程比较复杂!)。

那什么是 Endpoint ,什么是 EndpointRef ?

Endpoint 很好理解,可以理解为是分布式环境的一个个节点。

EndpointRef,相当于是对其他节点的引用,比如:国家 A 在国家 B 设立了一个大使馆,里面住着一位大使。如果国家 B 想和国家 A 通信,那么只要和国家 A 的大使通信即可,不用千里迢迢跑去国家 A 。那么这里国家A的大使就是 EndpointRef ,endpointRef.send(MESSAGE),就可以往国家 A 发送一个消息。

在 Spark 源码中,Worker 在启动完成之后,要向 Master 注册自己,那么注册的时候,就是用 Rpc 通信的,首先需要拿到 Master 的一个引用,然后发送一个注册消息:

代码语言:javascript复制
// 先拿到 master 的引用
val masterEndpointRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 然后发送一个消息
masterEndpointRef.send(RegisterWorker(...))

最后值得一提的是 Endpoint 的生命周期:

如果要使用 Endpoint ,首先需要往 RpcEnv 注册,注册之后会依次自动调用 Endpoint 的 构造函数 -> onStart() 方法 -> receive* -> onStop() 方法。

二、通过 Master 的启动来理解 Spark Rpc

下面我们通过 Master 的启动流程,来理解 RPCEndpoint 的注册与启动。

(1)启动脚本

在启动 Spark 集群的时候,一般是使用启动脚本:start-all.sh 来启动集群,这个脚本会分别启动 Master 和 Worker,如下:

代码语言:javascript复制
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh

# Start Workers
"${SPARK_HOME}/sbin"/start-workers.sh

然后在 start-master.sh 中,最终会执行 Master 这个类:

代码语言:javascript复制
CLASS="org.apache.spark.deploy.master.Master"
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 
  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT 
  $ORIGINAL_ARGS

可以看到是用 spark-daemon.sh 来启动 org.apache.spark.deploy.master.Master 这个类,那我们直接看 Master 的 main 方法

(2)NettyRpc 服务端的启动

main 方法中可以看到,这里在启动 RpcEnv 和 Endpoint,做了两件事情:

代码语言:javascript复制
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

首先创建 RpcEnv:

代码语言:javascript复制
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

可以看到最终是用 NettyRpcEnvFactory 工厂和 RpcEnvConfig 来创建的

代码语言:javascript复制
 new NettyRpcEnvFactory().create(config)

然后到了 NettyRpcEnv 类中

阅读源码小技巧:在阅读 scala 代码时,如果见到 new 一个对象,那么小括号中的是成员变量,大括号中的变量声明、代码块,静态代码块全部是构造方法,都会执行,方法的定义不会执行,如下:

在 new NettyRpcEnv 时,小括号中的都是这个类的成员变量,大括号的:

这些代码都会执行的。

可以看到在 new NettyRpcEnv 的时候,创建了 transportConf 和 transportContext 对象。

然后接着启动了 NettyRpcEnv

代码语言:javascript复制
nettyEnv.startServer(config.bindAddress, actualPort)

启动的时候,是用上面创建好的 TransportContext 来创建了一个 Server

然后点进去可以看到有个 init 方法

这里就是用 Netty 的 Api 正式的启动了一个 服务端

首先创建 BootStrap 引导器:

完了之后,绑定了端口,就完成了 Netty 服务器的启动:

(3)注册 Endpoint

然后再回到 Master 的 startRpcEnvAndEndpoint 方法中来

此时,RpcEnv 已经创建好了,也启动了一个服务端了:

然后下面就是把自己作为 Endpoint 注册到 RpcEnv 中来,因为 Master 本身就继承 了 Endpoint 接口:

(4)调用生命周期的 onStart() 方法

然后就是调用 Master 的 onStart() 方法,这里面主要做了下面几件事情:

首先启动了 Master 的 webUi:

然后给自己每隔几秒钟就发送一个 CheckForWorkerTimeOut 消息:

既然是给自己发送了一个消息,那么必然有处理消息的地方,然后我们就可以 Master 类中搜索:case CheckForWorkerTimeOut,来看具体是怎么处理这个消息的:

在这个方法中可以看到,会遍历 workers 这个列表,超过一定时间没有心跳的,就把状态改为 DEAD,并且移除

再然后启动了 masterMetricsSystem 这个东西,不重要。

再然后启动了一个持久化引擎和选举代理:

这个我们就下次再详细的讲。

到此为止,Master 就启动完毕了

三、小结

本篇我们通过 Master 的启动,介绍了 Spark Rpc 相关的源码,介绍了 RpcEndpoint 和 RPCEndpointRef,相信各位小伙伴已经对 Spark Rpc 有了初步的认知了。

0 人点赞