Spark netty RPC 通信原理
通信是分布式程序的血液和神经,就好比大脑发出的执行需要通过神经和需要才能传递到手脚进行执行。可见好的通信能力是分布式系统的重重之中。
其实Spark 的很多地方都涉及网络通信,比如 Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份,以及各个服务间的心跳传输等。
回顾Spark的通信的进化史,在Spark1.6之前,Spark的Rpc是基于Akka来实现通信的。(Akka是一个基于scala语言的比较先进异步通信的消息框架)但由于Akka不适合大文件的传输,其大文件是基于Jetty实现的HttpFileServer实现。但随着spark社区的发展,在Spark1.6中移除了Akka https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5293),原因概括为:
- 很多Spark用户也使用Akka,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全一样的Akka版本,导致用户无法升级Akka。
- Spark的Akka配置是针对Spark自身来调优的,会跟用户自己代码中的Akka配置冲突。
- Spark用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说很少,如果自己实现,那么debug比较容易,遇到什么bug,也可以马上fix,不需要等Akka上游发布新版本。
综上,在Spark2.xx中,spark基于netty,参照akka实现了Spark自己的RPC通信框架。
目前在spark中通信模块主要在core和network-common 模块中。 core 主要定义了RpcEnv, Endpoint, EndpointRef 等通信上层的实现,这部分基本是仿照Akka实现的, 在Spark-network-common主要实现了TransportClient, TransportServer等底层netty的通信模块。
1. Akka 通信系统架构
Akka 通过消息传递实现并发处理,规避了复杂的thread和私有数据,异步通信,事件响应等处理。
- 保持数据隔离并绑定到线程。线程应该隐藏(封装)它们的私有数据和其他资源,而不是与系统的其余部分共享它们。
- 通过消息(事件对象)在线程之间异步通信。使用异步事件可以使线程真正独立地运行,而不会相互阻塞。
- 线程应该将其生命周期用于响应传入事件,因此它们的主线应该由一个事件循环组成,该循环一次处理一个事件(直到完成),从而避免线程本身内的任何并发危险。
在java的并发开发实质上是通过thread lock实现,而akka 是通过消息不可变更和通信实现。 Akka的特点是1. 每个Actor自己的内部功能都是被串行执行的。2. Actor之间是通过底层的线程池来实现并行。
[图片上传失败...(image-a95df3-1646009602027)]
在Akka中重要是actor模型和 mailBox 通信系统,每一个Actor都维护一个Mailbox, 既可以收发消息。具体的执行则有维护的线程池进行执行。Spark通信框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。
2. Spark 通信系统架构
在Spark 中每一个实体(Client, Master, Worker)都可以认为是一个Actor, 其都会维护一个收件箱(inBox)和多个发件箱(OutBox)。
[图片上传失败...(image-70d8f7-1646009602027)]
如图所示,在spark中Endpoint 就相当于Akka中的Actor 。Endpoint有 1 个 InBox 和 N 个 OutBox(N>=1,N取决于当前 Endpoint 与多少其他的 Endpoint 进行通信,一个与其通讯的其他Endpoint 对应一个 OutBox),Endpoint 接收到的消息被写入 InBox,发送出去的消息写入 OutBox 并被发送到其他 Endpoint 的 InBox 中。
了解了Spark实现akka的通信原理进行节点间通信与并发处理。简述下Spark的通信系统,在Spark的上层是使用RpcEndpoint, RpcEndPointRef, Dispatcher 等作为Actor系统,inBox和outBox 实现mailBox。 而在底层进行远程消息投递的rpc调用中是通过TransportClient 和 TransportServer 实现底层远程rpc通信。
[图片上传失败...(image-6988c8-1646009602027)]
TransportServer 接受远程发送的消息 → Dispatcher → inbox → Async 消费
RpcEndpointRef 客户端 → RpcEnv→ OutBox → TransportClient 发送到TransportServer.
3. Spark 通信系统重要概念介绍
[图片上传失败...(image-fb8eef-1646009602026)]
上图是Spark 通信系统最重要的类的关系图,从中可以看出在Actor系统中最重要的是NettyRpcEnv, 在Netty的通信系统中最重要的是TransportContext。
Actor 体系:
- RpcEnv:RpcEnv 抽象类表示一个 RPC Environment,管理着整个RpcEndpoint的生命周期,每个 Rpc 端点运行时依赖的环境称之为 RpcEnv。
- NettyRpcEnv: RpcEnv的唯一实现类。在NettyRpcEnv中维护着,
transportConf
主要是从SparkConf中拷贝,主要关注网络的配置。Dispatcher
主要负责将消息分发到Endpoint, 相当于Akka中的ActorSystem系统。transportContext
维护Transport通信的上下文,通过其可以生成TransportServer 和 TransportClientFactory。多个outboxes
,以及提供了setupEndpoint
设置Endpoint的方法。 - RpcEndpoint:RPC 端点 ,Spark 将每个通信实体都都称之一个Rpc端点,且都实现 RpcEndpoint 接口,比如DriverEndpoint,MasterEndpont,内部根据不同端点的需求,设计不同的消息和不同的业务处理。相当于Actor。
- RpcEndPointRef: RpcEndpointRef是一个对RpcEndpoint的远程引用对象,每个RpcEndpoint可以有多个引用,通过它可以向远程的RpcEndpoint端发送消息以进行通信。在Spark从Endpoints 发送消息需要使用RpcEndPointRef, 相当于客户端。
- Inbox:一个本地端点对应一个收件箱,Inbox 里面有一个 InboxMessage 的链表,InboxMessage 有很多子类,可以是远程调用过来的 RpcMessage,可以是远程调用过来的 fire-and-forget 的单向消息 OneWayMessage,还可以是各种服务启动,链路建立断开等 Message,这些 Message 都会在 Inbox 内部的方法内做模式匹配,调用相应的 RpcEndpoint 的函数。
- Outbox:一个RpcEndpointRef对应一个发件箱,一个RpcEndpoint 可以有多个RpcEndpointRef。 NettyRpcEnv 中包含一个 ConcurrentHashMap[RpcAddress, Outbox]。当消息放入 Outbox 后,紧接着将消息通过 TransportClient 发送出去。
- Dispatcher:消息分发器(来自netty的概念),负责将 RpcMessage 分发至对应的 RpcEndpoint。Dispatcher 中包含一个 MessageLoop,它读取 LinkedBlockingQueue 中的投递 RpcMessage,根据客户端指定的 Endpoint 标识,找到 Endpoint 的 Inbox,然后投递进去,由于是阻塞队列,当没有消息的时候自然阻塞,一旦有消息,就开始工作。Dispatcher 的 ThreadPool 负责消费这些 Message。
Transport体系:
- TransportContext:是一个创建TransportServer, TransportClientFactory,使用TransportChannelHandler建立netty channel pipeline的上下文,这也是它的三个主要功能。TransportClient 提供了两种通信协议:控制层面的RPC以及数据层面的 “chunk抓取”。用户通过构造方法传入的 rpcHandler 负责处理RPC 请求。并且 rpcHandler 负责设置流,这些流可以使用零拷贝IO以数据块的形式流式传输。TransportServer 和 TransportClientFactory 都为每一个channel创建一个 TransportChannelHandler对象。每一个TransportChannelHandler 包含一个 TransportClient,这使服务器进程能够在现有通道上将消息发送回客户端。
- TransportServer:TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。
- TransportClientFactory:创建传输客户端(TransportClient)的传输客户端工厂类。 TransportClient:RPC框架的客户端,用于获取预先协商好的流中的连续块。TransportClient旨在允许有效传输大量数据,这些数据将被拆分成几百KB到几MB的块。简言之,可以认为TransportClient就是Spark Rpc 最底层的基础客户端类。主要用于向server端发送rpc 请求和从server 端获取流的chunk块。
- TransportChannelHandler:传输层的handler,负责委托请求给TransportRequestHandler,委托响应给TransportResponseHandler。在传输层中创建的所有通道都是双向的。当客户端使用RequestMessage启动Netty通道(由服务器的RequestHandler处理)时,服务器将生成ResponseMessage(由客户端的ResponseHandler处理)。但是,服务器也会在同一个Channel上获取句柄,因此它可能会开始向客户端发送RequestMessages。这意味着客户端还需要一个RequestHandler,而Server需要一个ResponseHandler,用于客户端对服务器请求的响应。此类还处理来自io.netty.handler.timeout.IdleStateHandler的超时。如果存在未完成的提取或RPC请求但是至少在“requestTimeoutMs”上没有通道上的流量,我们认为连接超时。请注意,这是双工流量;如果客户端不断发送但是没有响应,我们将不会超时。 当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。
Messages系统:
- MessageEncoder:在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。
- MessageDecoder:对从管道中读取的ByteBuf进行解析,防止丢包
- TransportFrameDecoder:对从管道中读取的ByteBuf按照数据帧进行解析;
- StreamManager:处理ChunkFetchRequest和StreamRequest请求
- RpcHandler:处理RpcRequest和OneWayMessage请求
- Message:Message是消息的抽象接口,消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口。