问题导读
- RPC服务端创建过程
- RPC客户端创建过程
- RPC调用流程
- 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证
简介
Flink基于Akka来实现内部各组件(ResourceManager、Dispatcher、JobMaster、TaskExecutor等)间的RPC通信。本篇着重分析Flink的RPC设计,如何封装Actor模型,RPC的创建和调用流程。
阅读说明: 源码版本:Flink release-1.14.4 阅读前提:了解Akka Actor基础知识 1). 先聊Flink的RPC设计,理清RpcGateway、RpcEndpoint、RpcService、RpcServer概念,也就弄明白了Flink如何封装Akka来实现RPC机制。有兴趣可继续往下阅读。 2). 结合代码,看问题1、2、3,进一步熟悉RPC的创建与交互过程。(重点关注AkkaRpcService、AkkaInvocationHandler、AkkaRpcActor类) 3). 第4个问题进一步延伸,主要是理清各组件间谁与谁会建立通信连接,先后顺序是怎样的,由此建立起整个RPC通信网络。在组件切leader、重启或者心跳超时等异常情况时,是否有容错机制重新建连。
接口设计
结合下面RpcGateway、RpcEndpoint、RpcService、RpcServer概念看这两张图
RpcGateway
- 用于定义RPC协议,是客户端和服务端沟通的桥梁。
- 服务端实现了RPC协议,即实现了接口中定义的方法,做具体的业务逻辑处理。
- 客户端实现了RPC协议,客户端是Proxy生成的代理对象,将对RpcGateway接口方法的调用转为Akka的消息发送。
- 关注其5个子接口:DispatcherGateway、ResourceManagerGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway。
RpcEndpoint
- RPC服务端的抽象,实现了该接口即为Rpc服务端,是Akka中Actor的封装。
- Actor收到ActorRef发送的消息(消息被封装为RpcInvocation对象),会通过RpcInvocation对象中的方法、参数等信息以反射的方式调用RpcGateway接口对应的方法。
- 关注其5个实现类:Dispatcher、ResourceManager、JobMaster、MetricQueryService、TaskExecutor。其中Dispatcher、ResourceManager、JobMaster是JobManager进程中的Rpc服务,TaskExecutor是TaskManager进程中的Rpc服务,MetricQueryService在JobManager和TaskManager进程中都有。
RpcService
- 是 RpcEndpoint 的运行时环境,是Akka中ActorSystem的封装。
- 一个ActorSystem系统中有多个Actor,同样在Flink中一个RpcService中有多个RpcEndpoint,即多个Rpc服务。
- Flink中RpcService也有多套,JobManager和TaskManager进程中都有两套RpcService。
- RpcService 提供了启动Rpc服务(startServer)、停止Rpc服务(stopServer)、连接远端Rpc服务等方法。
- 实现类是AkkaRpcService,内有属性ActorSystem actorSystem,Map<ActorRef, RpcEndpoint> actors。
RpcServer
是Rpc服务端自身的代理对象,设计上是供服务端调用自身非Rpc方法。
类关系图
注:这里借用网上画的一张图
问题
1. RPC服务端创建过程
RPC服务端是一个代理对象。
入口
代码语言:java复制RpcEndpoint.java
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 生成代理对象
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
代理对象生成过程如下:
1. 以Ask方式向SupervisorActor发送StartAkkaRpcActor消息,SupervisorActor收到消息后根据消息里RpcEndpoint的配置信息创建Actor,并以tell方式回复创建成功。
代码语言:java复制AkkaRpcService.java
startServer(C rpcEndpoint)
registerAkkaRpcActor(rpcEndpoint)
{
// 创建Actor
SupervisorActor.startAkkaRpcActor(
supervisor.getActor(),
actorTerminationFuture ->
Props.create(
akkaRpcActorType,
rpcEndpoint,
actorTerminationFuture,
getVersion(),
configuration.getMaximumFramesize(),
flinkClassLoader),
rpcEndpoint.getEndpointId());
// 在RpcService中保存ActorRef与RpcEndpoint引用关系
actors.put(actorRegistration.getActorRef(), rpcEndpoint)
}
代码语言:java复制SupervisorActor.java
// 1) 发送消息
public static StartAkkaRpcActorResponse startAkkaRpcActor(
ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
// 以Ask方式发送消息并等待结果
// Ask在实现上实际上是会创建一个Actor等待响应结果,成功或者超时时,销毁Actor
return Patterns.ask(
supervisor,
createStartAkkaRpcActorMessage(propsFactory, endpointId),
RpcUtils.INF_DURATION)
.toCompletableFuture()
.thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
.join();
}
// 2) 处理消息
private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
final String endpointId = startAkkaRpcActor.getEndpointId();
final Props akkaRpcActorProps = ...
...
try {
// 创建Actor
final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId);
registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration);
// 回复消息
getSender().tell(
StartAkkaRpcActorResponse.success(...),
getSelf()
);
} catch (AkkaException akkaException) {
getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
}
}
2. 准备代理对象要实现的接口
代码语言:java复制AkkaRpcService.java
startServer(C rpcEndpoint)
{
...
// 服务端对象实现了RpcGateway接口
Set<Class<?>> implementedRpcGateways =
new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
// 服务端对象是一个RpcServer
implementedRpcGateways.add(RpcServer.class);
// 服务端对象是Akka endpoint,可以获取到ActorRef引用
implementedRpcGateways.add(AkkaBasedEndpoint.class);
if (rpcEndpoint instanceof FencedRpcEndpoint){
implementedRpcGateways.add(FencedMainThreadExecutable.class);
}
}
3. 生成代理对象
代码语言:java复制AkkaRpcService.java
startServer(C rpcEndpoint)
{
...
RpcServer server =
(RpcServer)
Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(
new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
}
2. RPC客户端创建过程
RPC客户端是一个代理对象。
入口:RpcService的connect(String address, Class<C> clazz)方法。
代码语言:java复制AkkaRpcService.java
connect(String address, Class<C> clazz)
{
...
// 1) 使用ActorSystem.actorSelection(address).resolveOne的方式来获取Actor的引用ActorRef(ActorRef可以用来向服务端Actor发送消息)
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
// 2) ActorRef创建完成后,使用ask的方式向服务端发送一条握手消息(用来验证Client和Server彼此版本一致)
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) ->
AkkaFutureUtils.toJava(
Patterns.ask(
actorRef,
new RemoteHandshakeMessage(
clazz, getVersion()),
configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(
HandshakeSuccessMessage
.class))));
// 3) 以上2个事都做完后,异步创建代理对象并返回
final CompletableFuture<C> gatewayFuture =
actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
// invocationHandlerFactory.apply(actorRef) = new AkkaInvocationHandler 或 new 或 FencedAkkaInvocationHandler
InvocationHandler invocationHandler =
invocationHandlerFactory.apply(actorRef);
ClassLoader classLoader = getClass().getClassLoader();
C proxy =
(C)
Proxy.newProxyInstance(
classLoader,
new Class<?>[] {clazz},
invocationHandler);
return proxy;
},
actorSystem.dispatcher());
return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
3. RPC调用流程
1. 通过客户端代理对象调用RpcGateway的方法会交由invoke方法执行。
2. invoke将方法、参数信息封装为RpcInvocation对象,并通过ActorRef将消息发送给服务端Actor。
如果执行的方法有返回值就使用Akka ask方式,否则以tell方式发送消息。 通过连接的服务端的地址可以判断出服务端在远程还是本地。 如果在远程,消息类型为RemoteRpcInvocation,实现了序列化接口,对象可序列化传输。(会判断methodName parameterTypes args序列化后的字节数是否超时指定的值,见参数akka.remote.netty.tcp.maximum-frame-size) 如果在本地,消息类型为LocalRpcInvocation。
3. 服务端Actor收到RpcInvocation消息,会从中获取到方法名、方法参数等相关信息,在主线程中通过反射的方式调用代理对象对应方法执行业务逻辑,如果方法有返回值,还会以tell方法告知客户端结果。
客户端相关代码如下:
代码语言:java复制class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer{
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
// 非Rpc方法,直接本地执行。这个是服务端通过自己的代理对象RpcServer调用自己非Rpc方法时走的逻辑
if (declaringClass.equals(AkkaBasedEndpoint.class)
|| declaringClass.equals(Object.class)
|| declaringClass.equals(RpcGateway.class)
|| declaringClass.equals(StartStoppable.class)
|| declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) { // 支持HA的见FencedAkkaInvocationHandler
throw new UnsupportedOperationException(...);
} else {
// RPC方法,指RpcGateway子接口中定义的方法
// 接口:ResourceManagerGateway、DispatcherGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway
result = invokeRpc(method, args);
}
return result;
}
private Object invokeRpc(Method method, Object[] args) throws Exception {
...
// 1) 封装消息
final RpcInvocation rpcInvocation =
createRpcInvocationMessage(methodName, parameterTypes, args);
// 2) 借助akka发送消息,进行RPC调用
Class<?> returnType = method.getReturnType();
final Object result;
if (Objects.equals(returnType, Void.TYPE)) {
// 无返回值,用akka tell模式
tell(rpcInvocation);
result = null;
} else {
...
// 有返回值,用akka ask模式
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
...
}
return result;
}
}
服务端相关代码如下:
代码语言:java复制AkkaRpcActor.java
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
Method rpcMethod = null;
try {
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
rpcMethod = lookupRpcMethod(methodName, parameterTypes);
} catch (Exception e) {
...
getSender().tell(new Status.Failure(rpcException), getSelf());
}
if (rpcMethod != null) {
try {
rpcMethod.setAccessible(true);
final Method capturedRpcMethod = rpcMethod;
// 1) 无返回值
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
runWithContextClassLoader(
() -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
flinkClassLoader);
} else {
// 2) 有返回值
final Object result;
try {
result =
runWithContextClassLoader(
() ->
capturedRpcMethod.invoke(
rpcEndpoint, rpcInvocation.getArgs()),
flinkClassLoader);
} catch (InvocationTargetException e) {
...
getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
return;
}
final String methodName = rpcMethod.getName();
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
sendAsyncResponse(responseFuture, methodName);
} else {
sendSyncResponse(result, methodName);
}
}
} catch (Throwable e) {
...
getSender().tell(new Status.Failure(e), getSelf());
}
}
}
4. 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证
总述
哪些组件之间会建立RPC连接,什么时候会建立,连接又是如何建立起来的?
1). Rpc服务端作为服务端提供了Rpc服务,其内部也有其他Rpc服务的客户端
JobMaster主动连接ResourceManager,ResourceManager回连JobMaster
TaskManager主动连接ResourceManager,ResourceManager回连TaskManager
TaskManager主动连接JobMaster,JobMaster回连TaskManager
2). Dispatcher会连接ResourceManager的Rpc服务
是通过GatewayRetriever来发现ResourceManager的Rpc地址信息
3). WebMonitorEndpoint会连接Dispatcher和ResourceManager的Rpc服务
是通过GatewayRetriever来发现Dispatcher和ResourceManager的Rpc地址信息
WebMonitorEndpoint是一个基于netty实现的rest服务,非Rpc服务端
4). WebMonitorEndpoint会连接JobManager进程中的MetricQueryService Rpc服务和TaskManager进程中的MetricQueryService Rpc服务
步骤:
- 其内部有MetricFetcher,MetricFetcher通过GatewayRetriever发现并连接Dispatcher
- 调用Dispatcher的requestMultipleJobDetails方法获取Job统计信息
- 调用Dispatcher的requestMetricQueryServiceAddresses方法获取JobManager的Rpc服务地址,通过MetricQueryServiceRetriever的retrieveService方法连接到JobManager上的MetricQueryService Rpc服务,查询并获取JobManager的metric数据
- 调用Dispatcher的requestTaskManagerMetricQueryServiceAddresses方法获取所有TaskManager的Rpc服务地址,通过MetricQueryServiceRetriever的retrieveService方法连接到TaskManager上的MetricQueryService Rpc服务,查询并获取TaskManager的metric数据
5). JobClient会连接Dispatcher的Rpc服务
下面重点说明JobMaster、ResourceManager、TaskManager之间的连接建立过程,为了描述方便,JM指JobMaster,RM指ResourceManager,TM指TaskManager
JM连接RM,RM回连JM
1). 连接建立过程(JM主动连接RM,RM回连JM)
入口是JobMaster的reconnectToResourceManager方法
调用链使用伪码表示如下:
代码语言:java复制JobMaster.java
reconnectToResourceManager
tryConnectToResourceManager
connectToResourceManager
{
// 这里用resourceManagerAddress地址重新构建一个连接
// RM地址发生切换时,resourceManagerAddress值也会得到更新
resourceManagerConnection = new ResourceManagerConnection(..., resourceManagerAddress, ...)
resourceManagerConnection.start
}
RegisteredRpcConnection.java
start
newRegistration.startRegistration
RetryingRegistration.java
startRegistration
// 1). 建立与RM的连接
rpcService.connect
// 2). 向RM注册自己
register
JobMaster.ResourceManagerConnection.invokeRegistration
gateway.registerJobManager
{
// 3). RM方法内部回连JM
getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class)
}
2). JM如何获取RM地址?
通过LeaderRetrievalService获取
分两种情况
- 没有启用HA
RM地址、Dispatcher地址、WebMonitor地址都保存在StandaloneHaServices对象中,也就是内存中,地址不会发生变化。
地址为Akka地址,格式:protocolPrefix://flink@hostname:port/user/rpc/endpointName,hostname和port为JobManager的rpc host和port,三者仅在最后的endpointName上有区别。
- 启用了HA
可以是ZK或者K8S
ZK是通过NodeCache监听了一个节点的数据变化,这个节点上保存了leader信息
K8S是Watch了一个ConfigMap
代码语言:java复制JobMaster.java
// LeaderRetrievalService用于发现leader,启用HA时start方法会创建一个具体的LeaderRetrievalDriver
// driver上的leader切换事件最后会通知到LeaderRetrievalListener的notifyLeaderAddress上
// ResourceManagerLeaderListener实现了LeaderRetrievalListener接口
LeaderRetrievalService resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever()
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
ResourceManagerLeaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID)
3). 什么时候与RM建立连接?
如1)所述,入口是reconnectToResourceManager,该方法在三处地方被调用
JM发现自己与RM心跳超时,JM会重连RM
代码语言:java复制JobMaster.java
ResourceManagerHeartbeatListener.notifyHeartbeatTimeout
reconnectToResourceManager
JM发现RM切leader,JM会重连新的RM
代码语言:java复制JobMaster.java
ResourceManagerLeaderListener.notifyLeaderAddress
notifyOfNewResourceManagerLeader
createResourceManagerAddress
reconnectToResourceManager
RM发现自己与JM心跳超时,RM会通知JM去重连RM
代码语言:java复制ResourceManager.java
JobManagerHeartbeatListener.notifyHeartbeatTimeout
closeJobManagerConnection
jobMasterGateway.disconnectResourceManager(getFencingToken(), cause)
{
// JobMaster disconnectResourceManager方法内部
if (isConnectingToResourceManager(resourceManagerId)) {
reconnectToResourceManager(cause);
}
}
private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
return resourceManagerAddress != null
&& resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
}
TM连接RM,RM回连TM
1). 连接建立过程(TM主动连接RM,RM回连TM)
入口是TaskExecutor的reconnectToResourceManager方法
调用过程同上述1中JM连接RM类似,不再多述
2). TM如何获取RM地址?
通过LeaderRetrievalService获取,同上述1中JM获取RM地址一致
3). 什么时候与RM建立连接?
- TM发现自己与RM心跳超时,TM会重连RM
- TM发现RM切leader时,TM会重连RM
- RM发现自己与TM心跳超时,RM会通知TM去重连RM
TM连接JM,JM回连TM
1). 连接建立过程(TM主动连接JM,JM回连TM)
入口是TaskExecutor的disconnectAndTryReconnectToJobManager方法
调用过程同上述1中JM连接RM类似,只是这里将JM地址的切换感知放在了JobLeaderService中,默认实现类是DefaultJobLeaderService,不再多述。
2). TM如何获取JM地址?
通过LeaderRetrievalService获取,一个TM中是可以跑多个Job的Task的,也就会连多个JM
见DefaultJobLeaderService属性Map<JobID, Tuple2<LeaderRetrievalService, DefaultJobLeaderService.JobManagerLeaderListener>> jobLeaderServices。
3). 什么时候与JM建立连接?
- TM发现自己与JM心跳超时,TM会重连JM
- TM发现JM切leader,TM会重连新的JM
- JM发现自己与TM心跳超时,JM会通知TM去重连JM
JM主动连RM,TM主动连RM和JM,心跳超时或者切leader时会发生重连,那第一次建立连接在哪?
在notifyLeaderAddress方法中。
为什么是JM主动连接RM、TM主动连接RM和JM?
- Flink集群中先有ResourceManager和Dispatcher,有任务需要运行时Dispatcher才创建JobMaster(Dispatcher可创建多个JobMaster,一个JobGraph对应一个JobMaster)。ResourceManager地址是已知的, JobMaster连接上ResourceManager后调用ResourceManager的registerJobManager注册自己,ResourceManager再回连JobMaster。
- ResourceManager和Dispatcher在JobManager进程中,Flink集群是先启动JobManager进程后启动TaskManager进程,TaskManager进程可以有多个。