Flink源码分析之RPC通信

2022-07-12 13:23:25 浏览数 (1)

问题导读

  1. RPC服务端创建过程
  2. RPC客户端创建过程
  3. RPC调用流程
  4. 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证

简介

Flink基于Akka来实现内部各组件(ResourceManager、Dispatcher、JobMaster、TaskExecutor等)间的RPC通信。本篇着重分析Flink的RPC设计,如何封装Actor模型,RPC的创建和调用流程。

阅读说明: 源码版本:Flink release-1.14.4 阅读前提:了解Akka Actor基础知识 1). 先聊Flink的RPC设计,理清RpcGateway、RpcEndpoint、RpcService、RpcServer概念,也就弄明白了Flink如何封装Akka来实现RPC机制。有兴趣可继续往下阅读。 2). 结合代码,看问题1、2、3,进一步熟悉RPC的创建与交互过程。(重点关注AkkaRpcService、AkkaInvocationHandler、AkkaRpcActor类) 3). 第4个问题进一步延伸,主要是理清各组件间谁与谁会建立通信连接,先后顺序是怎样的,由此建立起整个RPC通信网络。在组件切leader、重启或者心跳超时等异常情况时,是否有容错机制重新建连。

接口设计

结合下面RpcGateway、RpcEndpoint、RpcService、RpcServer概念看这两张图

JobManager RpcServiceJobManager RpcService
TaskManager RpcServiceTaskManager RpcService

RpcGateway

  1. 用于定义RPC协议,是客户端和服务端沟通的桥梁。
  2. 服务端实现了RPC协议,即实现了接口中定义的方法,做具体的业务逻辑处理
  3. 客户端实现了RPC协议,客户端是Proxy生成的代理对象,将对RpcGateway接口方法的调用转为Akka的消息发送。
  4. 关注其5个子接口:DispatcherGateway、ResourceManagerGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway。

RpcEndpoint

  1. RPC服务端的抽象,实现了该接口即为Rpc服务端,是Akka中Actor的封装。
  2. Actor收到ActorRef发送的消息(消息被封装为RpcInvocation对象),会通过RpcInvocation对象中的方法、参数等信息以反射的方式调用RpcGateway接口对应的方法。
  3. 关注其5个实现类:Dispatcher、ResourceManager、JobMaster、MetricQueryService、TaskExecutor。其中Dispatcher、ResourceManager、JobMaster是JobManager进程中的Rpc服务,TaskExecutor是TaskManager进程中的Rpc服务,MetricQueryService在JobManager和TaskManager进程中都有。

RpcService

  1. 是 RpcEndpoint 的运行时环境,是Akka中ActorSystem的封装
  2. 一个ActorSystem系统中有多个Actor,同样在Flink中一个RpcService中有多个RpcEndpoint,即多个Rpc服务。
  3. Flink中RpcService也有多套,JobManager和TaskManager进程中都有两套RpcService。
  4. RpcService 提供了启动Rpc服务(startServer)、停止Rpc服务(stopServer)、连接远端Rpc服务等方法
  5. 实现类是AkkaRpcService,内有属性ActorSystem actorSystem,Map<ActorRef, RpcEndpoint> actors。

RpcServer

是Rpc服务端自身的代理对象,设计上是供服务端调用自身非Rpc方法。

类关系图

注:这里借用网上画的一张图

Flink RPC详解Flink RPC详解

问题

1. RPC服务端创建过程

RPC服务端是一个代理对象。

入口

代码语言:java复制
RpcEndpoint.java
  
    protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
				
  			// 生成代理对象
        this.rpcServer = rpcService.startServer(this);

        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

代理对象生成过程如下:

1. 以Ask方式向SupervisorActor发送StartAkkaRpcActor消息,SupervisorActor收到消息后根据消息里RpcEndpoint的配置信息创建Actor,并以tell方式回复创建成功。

代码语言:java复制
AkkaRpcService.java
    startServer(C rpcEndpoint)
        registerAkkaRpcActor(rpcEndpoint)
   		{
     			 	// 创建Actor
             SupervisorActor.startAkkaRpcActor(
               supervisor.getActor(),
               actorTerminationFuture ->
               Props.create(
                 akkaRpcActorType,
                 rpcEndpoint,
                 actorTerminationFuture,
                 getVersion(),
                 configuration.getMaximumFramesize(),
                 flinkClassLoader),
               rpcEndpoint.getEndpointId());  
   
             // 在RpcService中保存ActorRef与RpcEndpoint引用关系
             actors.put(actorRegistration.getActorRef(), rpcEndpoint)    
   		}
代码语言:java复制
SupervisorActor.java
     
   		// 1) 发送消息
       public static StartAkkaRpcActorResponse startAkkaRpcActor(
               ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
     
       			// 以Ask方式发送消息并等待结果
     				// Ask在实现上实际上是会创建一个Actor等待响应结果,成功或者超时时,销毁Actor
           return Patterns.ask(
                           supervisor,
                           createStartAkkaRpcActorMessage(propsFactory, endpointId),
                           RpcUtils.INF_DURATION)
                   .toCompletableFuture()
                   .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
                   .join();
       } 
   
   		// 2) 处理消息
       private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
           final String endpointId = startAkkaRpcActor.getEndpointId();
           final Props akkaRpcActorProps = ...
           ...
   
           try {
             	// 创建Actor
               final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId);
   
               registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration);
   
             	// 回复消息
               getSender().tell(
                               	StartAkkaRpcActorResponse.success(...),
                               	getSelf()
               								);
           } catch (AkkaException akkaException) {
               getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
           }
       }

2. 准备代理对象要实现的接口

代码语言:java复制
AkkaRpcService.java
    startServer(C rpcEndpoint)
   	{
   		...
        // 服务端对象实现了RpcGateway接口
        Set<Class<?>> implementedRpcGateways =
        new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
            
            // 服务端对象是一个RpcServer
        implementedRpcGateways.add(RpcServer.class);
            // 服务端对象是Akka endpoint,可以获取到ActorRef引用
        implementedRpcGateways.add(AkkaBasedEndpoint.class); 
    
        if (rpcEndpoint instanceof FencedRpcEndpoint){
            implementedRpcGateways.add(FencedMainThreadExecutable.class);
        }
   	}

3. 生成代理对象

代码语言:java复制
AkkaRpcService.java
   	startServer(C rpcEndpoint)
   	{
     	...
        RpcServer server =
                   (RpcServer)
                           Proxy.newProxyInstance(
                                   classLoader,
                                   implementedRpcGateways.toArray(
                                           new Class<?>[implementedRpcGateways.size()]),
                                   akkaInvocationHandler);
   
         return server;        
   	}

2. RPC客户端创建过程

RPC客户端是一个代理对象。

入口:RpcService的connect(String address, Class<C> clazz)方法。

代码语言:java复制
AkkaRpcService.java
  connect(String address, Class<C> clazz)
		{
  			...
        // 1) 使用ActorSystem.actorSelection(address).resolveOne的方式来获取Actor的引用ActorRef(ActorRef可以用来向服务端Actor发送消息)
        final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
  
  			// 2) ActorRef创建完成后,使用ask的方式向服务端发送一条握手消息(用来验证Client和Server彼此版本一致)
        final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
                actorRefFuture.thenCompose(
                        (ActorRef actorRef) ->
                                AkkaFutureUtils.toJava(
                                        Patterns.ask(
                                                        actorRef,
                                                        new RemoteHandshakeMessage(
                                                                clazz, getVersion()),
                                                        configuration.getTimeout().toMilliseconds())
                                                .<HandshakeSuccessMessage>mapTo(
                                                        ClassTag$.MODULE$
                                                                .<HandshakeSuccessMessage>apply(
                                                                        HandshakeSuccessMessage
                                                                                .class))));  
  
  			// 3) 以上2个事都做完后,异步创建代理对象并返回
        final CompletableFuture<C> gatewayFuture =
                actorRefFuture.thenCombineAsync(
                        handshakeFuture,
                        (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                          	// invocationHandlerFactory.apply(actorRef) = new AkkaInvocationHandler 或 new 或 FencedAkkaInvocationHandler
                            InvocationHandler invocationHandler =
                                    invocationHandlerFactory.apply(actorRef);

                            ClassLoader classLoader = getClass().getClassLoader();

                            C proxy =
                                    (C)
                                            Proxy.newProxyInstance(
                                                    classLoader,
                                                    new Class<?>[] {clazz},
                                                    invocationHandler);

                            return proxy;
                        },
                        actorSystem.dispatcher());

        return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);  	
		}

3. RPC调用流程

1. 通过客户端代理对象调用RpcGateway的方法会交由invoke方法执行。

2. invoke将方法、参数信息封装为RpcInvocation对象,并通过ActorRef将消息发送给服务端Actor。

如果执行的方法有返回值就使用Akka ask方式,否则以tell方式发送消息。 通过连接的服务端的地址可以判断出服务端在远程还是本地。 如果在远程,消息类型为RemoteRpcInvocation,实现了序列化接口,对象可序列化传输。(会判断methodName parameterTypes args序列化后的字节数是否超时指定的值,见参数akka.remote.netty.tcp.maximum-frame-size) 如果在本地,消息类型为LocalRpcInvocation。

3. 服务端Actor收到RpcInvocation消息,会从中获取到方法名、方法参数等相关信息,在主线程中通过反射的方式调用代理对象对应方法执行业务逻辑,如果方法有返回值,还会以tell方法告知客户端结果。

客户端相关代码如下:

代码语言:java复制
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer{
  
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        Object result;

      	// 非Rpc方法,直接本地执行。这个是服务端通过自己的代理对象RpcServer调用自己非Rpc方法时走的逻辑
        if (declaringClass.equals(AkkaBasedEndpoint.class)
                || declaringClass.equals(Object.class)
                || declaringClass.equals(RpcGateway.class)
                || declaringClass.equals(StartStoppable.class)
                || declaringClass.equals(MainThreadExecutable.class)
                || declaringClass.equals(RpcServer.class)) {
            result = method.invoke(this, args);
        } else if (declaringClass.equals(FencedRpcGateway.class)) { // 支持HA的见FencedAkkaInvocationHandler
            throw new UnsupportedOperationException(...);
        } else {
          	// RPC方法,指RpcGateway子接口中定义的方法
          	// 接口:ResourceManagerGateway、DispatcherGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway
            result = invokeRpc(method, args);
        }

        return result;
    }
  
    private Object invokeRpc(Method method, Object[] args) throws Exception {
				...

        // 1) 封装消息
        final RpcInvocation rpcInvocation =
                createRpcInvocationMessage(methodName, parameterTypes, args);

        // 2) 借助akka发送消息,进行RPC调用
        Class<?> returnType = method.getReturnType();

        final Object result;

        if (Objects.equals(returnType, Void.TYPE)) {
          	// 无返回值,用akka tell模式
            tell(rpcInvocation);

            result = null;
        } else {
						...
            // 有返回值,用akka ask模式
            final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
						...
        }

        return result;
    }
  
}

服务端相关代码如下:

代码语言:java复制
AkkaRpcActor.java
  
    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method rpcMethod = null;

        try {
            String methodName = rpcInvocation.getMethodName();
            Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();

            rpcMethod = lookupRpcMethod(methodName, parameterTypes);
        } catch (Exception e) {
          	...
            getSender().tell(new Status.Failure(rpcException), getSelf());
        }

        if (rpcMethod != null) {
            try {
                rpcMethod.setAccessible(true);
                final Method capturedRpcMethod = rpcMethod;
              
              	// 1) 无返回值
                if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                    runWithContextClassLoader(
                            () -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
                            flinkClassLoader);
                } else {
                  	// 2) 有返回值
                    final Object result;
                    try {
                        result =
                                runWithContextClassLoader(
                                        () ->
                                                capturedRpcMethod.invoke(
                                                        rpcEndpoint, rpcInvocation.getArgs()),
                                        flinkClassLoader);
                    } catch (InvocationTargetException e) {
												...
                        getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
                        return;
                    }

                    final String methodName = rpcMethod.getName();

                    if (result instanceof CompletableFuture) {
                        final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
                        sendAsyncResponse(responseFuture, methodName);
                    } else {
                        sendSyncResponse(result, methodName);
                    }
                }
            } catch (Throwable e) {
              	...
                getSender().tell(new Status.Failure(e), getSelf());
            }
        }
    }  

4. 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证

总述

哪些组件之间会建立RPC连接,什么时候会建立,连接又是如何建立起来的?

1). Rpc服务端作为服务端提供了Rpc服务,其内部也有其他Rpc服务的客户端

JobMaster主动连接ResourceManager,ResourceManager回连JobMaster

TaskManager主动连接ResourceManager,ResourceManager回连TaskManager

TaskManager主动连接JobMaster,JobMaster回连TaskManager

2). Dispatcher会连接ResourceManager的Rpc服务

是通过GatewayRetriever来发现ResourceManager的Rpc地址信息

3). WebMonitorEndpoint会连接Dispatcher和ResourceManager的Rpc服务

是通过GatewayRetriever来发现Dispatcher和ResourceManager的Rpc地址信息

WebMonitorEndpoint是一个基于netty实现的rest服务,非Rpc服务端

4). WebMonitorEndpoint会连接JobManager进程中的MetricQueryService Rpc服务和TaskManager进程中的MetricQueryService Rpc服务

步骤:

  1. 其内部有MetricFetcher,MetricFetcher通过GatewayRetriever发现并连接Dispatcher
  2. 调用Dispatcher的requestMultipleJobDetails方法获取Job统计信息
  3. 调用Dispatcher的requestMetricQueryServiceAddresses方法获取JobManager的Rpc服务地址,通过MetricQueryServiceRetriever的retrieveService方法连接到JobManager上的MetricQueryService Rpc服务,查询并获取JobManager的metric数据
  4. 调用Dispatcher的requestTaskManagerMetricQueryServiceAddresses方法获取所有TaskManager的Rpc服务地址,通过MetricQueryServiceRetriever的retrieveService方法连接到TaskManager上的MetricQueryService Rpc服务,查询并获取TaskManager的metric数据

5). JobClient会连接Dispatcher的Rpc服务

下面重点说明JobMaster、ResourceManager、TaskManager之间的连接建立过程,为了描述方便,JM指JobMaster,RM指ResourceManager,TM指TaskManager

JM连接RM,RM回连JM

1). 连接建立过程(JM主动连接RM,RM回连JM)

入口是JobMaster的reconnectToResourceManager方法

调用链使用伪码表示如下:

代码语言:java复制
JobMaster.java
     reconnectToResourceManager
     	tryConnectToResourceManager
     		connectToResourceManager
   			{
     			// 这里用resourceManagerAddress地址重新构建一个连接
     			// RM地址发生切换时,resourceManagerAddress值也会得到更新
   				resourceManagerConnection = new ResourceManagerConnection(..., resourceManagerAddress, ...)  
                resourceManagerConnection.start	
   			}
     
RegisteredRpcConnection.java
    start
    	newRegistration.startRegistration
     
RetryingRegistration.java
    startRegistration
     	// 1). 建立与RM的连接
     	rpcService.connect	
     	// 2). 向RM注册自己
     	register
     		JobMaster.ResourceManagerConnection.invokeRegistration
     			gateway.registerJobManager	
   				{
     				// 3). RM方法内部回连JM
     				getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class)
   				}

2). JM如何获取RM地址?

通过LeaderRetrievalService获取

分两种情况

  • 没有启用HA

RM地址、Dispatcher地址、WebMonitor地址都保存在StandaloneHaServices对象中,也就是内存中,地址不会发生变化。

地址为Akka地址,格式:protocolPrefix://flink@hostname:port/user/rpc/endpointName,hostname和port为JobManager的rpc host和port,三者仅在最后的endpointName上有区别。

  • 启用了HA

可以是ZK或者K8S

ZK是通过NodeCache监听了一个节点的数据变化,这个节点上保存了leader信息

K8S是Watch了一个ConfigMap

代码语言:java复制
JobMaster.java
   
  // LeaderRetrievalService用于发现leader,启用HA时start方法会创建一个具体的LeaderRetrievalDriver
  // driver上的leader切换事件最后会通知到LeaderRetrievalListener的notifyLeaderAddress上
  // ResourceManagerLeaderListener实现了LeaderRetrievalListener接口
  LeaderRetrievalService resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever()
  resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
  ResourceManagerLeaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID)  

3). 什么时候与RM建立连接?

如1)所述,入口是reconnectToResourceManager,该方法在三处地方被调用

JM发现自己与RM心跳超时,JM会重连RM

代码语言:java复制
JobMaster.java
    ResourceManagerHeartbeatListener.notifyHeartbeatTimeout
        reconnectToResourceManager

JM发现RM切leader,JM会重连新的RM

代码语言:java复制
JobMaster.java
    ResourceManagerLeaderListener.notifyLeaderAddress
        notifyOfNewResourceManagerLeader
        	createResourceManagerAddress
        	reconnectToResourceManager

RM发现自己与JM心跳超时,RM会通知JM去重连RM

代码语言:java复制
ResourceManager.java
    JobManagerHeartbeatListener.notifyHeartbeatTimeout
        closeJobManagerConnection
        	jobMasterGateway.disconnectResourceManager(getFencingToken(), cause)
      		{
        		// JobMaster disconnectResourceManager方法内部
        		if (isConnectingToResourceManager(resourceManagerId)) {
                  	reconnectToResourceManager(cause);
              	}
      		}
      
      
    private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
        return resourceManagerAddress != null
            && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
    }

TM连接RM,RM回连TM

1). 连接建立过程(TM主动连接RM,RM回连TM)

入口是TaskExecutor的reconnectToResourceManager方法

调用过程同上述1中JM连接RM类似,不再多述

2). TM如何获取RM地址?

通过LeaderRetrievalService获取,同上述1中JM获取RM地址一致

3). 什么时候与RM建立连接?

  1. TM发现自己与RM心跳超时,TM会重连RM
  2. TM发现RM切leader时,TM会重连RM
  3. RM发现自己与TM心跳超时,RM会通知TM去重连RM

TM连接JM,JM回连TM

1). 连接建立过程(TM主动连接JM,JM回连TM)

入口是TaskExecutor的disconnectAndTryReconnectToJobManager方法

调用过程同上述1中JM连接RM类似,只是这里将JM地址的切换感知放在了JobLeaderService中,默认实现类是DefaultJobLeaderService,不再多述。

2). TM如何获取JM地址?

通过LeaderRetrievalService获取,一个TM中是可以跑多个Job的Task的,也就会连多个JM

见DefaultJobLeaderService属性Map<JobID, Tuple2<LeaderRetrievalService, DefaultJobLeaderService.JobManagerLeaderListener>> jobLeaderServices。

3). 什么时候与JM建立连接?

  1. TM发现自己与JM心跳超时,TM会重连JM
  2. TM发现JM切leader,TM会重连新的JM
  3. JM发现自己与TM心跳超时,JM会通知TM去重连JM

JM主动连RM,TM主动连RM和JM,心跳超时或者切leader时会发生重连,那第一次建立连接在哪?

在notifyLeaderAddress方法中。

为什么是JM主动连接RM、TM主动连接RM和JM?

  1. Flink集群中先有ResourceManager和Dispatcher,有任务需要运行时Dispatcher才创建JobMaster(Dispatcher可创建多个JobMaster,一个JobGraph对应一个JobMaster)。ResourceManager地址是已知的, JobMaster连接上ResourceManager后调用ResourceManager的registerJobManager注册自己,ResourceManager再回连JobMaster。
  2. ResourceManager和Dispatcher在JobManager进程中,Flink集群是先启动JobManager进程后启动TaskManager进程,TaskManager进程可以有多个。

0 人点赞