ignite 2.11.0 客户端加入集群过程源码分析

2022-01-10 09:24:58 浏览数 (1)

客户端加入集群过程源码分析

    • ClientImpl源码分析
      • 启动
      • tcp-client-disco-msg-worker线程
      • tcp-client-disco-sock-writer线程
      • tcp-client-disco-sock-reader线程
      • tcp-discovery-exec线程
    • ServerImpl源码分析
      • 启动
      • tcp-disco-srvr线程
      • tcp-disco-client-message-worker线程
      • tcp-disco-msg-worker线程

本文分析ignite 客户端加入集群过程中重要的源码内容,原理可查阅ignite节点发现原理及源码分析

ClientImpl源码分析

启动

客户端通过以下代码启动

代码语言:javascript复制
  IgniteConfiguration cfg = new IgniteConfiguration();
  ...
  cfg.setClientMode(true);
  Ignite ignite = Ignition.start(cfg);

在启动过程中,会启动一个 GridIoManager管理器,此管理器会收集客户端本地信息,如hostname、ip、jdk环境等等。

在启动GridIoManager之后,会启动discovery manager管理器,此管理器会启动tcp-client-disco-msg-worker-、tcp-client-disco-sock-writer-、tcp-client-disco-sock-reader-、tcp-discovery-exec-这些线程

tcp-client-disco-msg-worker线程

主要2个方法,tryJoin()和processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg)

tryJoin方法:

  1. openSocket,与服务端建立连接,发0x00004747到服务端(类似一个hello包)
  2. 发送TcpDiscoveryHandshakeRequest包,类似握手请求包
  3. 读取服务端发送的TcpDiscoveryHandshakeResponse包,即握手响应包
  4. 发送TcpDiscoveryJoinRequestMessage包,节点加入请求包,此包包含加入节点的信息
  5. 接收服务端处理完TcpDiscoveryJoinRequest后发送的ok包

processDiscoveryMessage方法:

客户端从MessageWorker的queue队列里取出服务端发送的TcpDiscoveryNodeAddedMessage和TcpDiscoveryNodeAddFinishedMessage进行处理

tcp-client-disco-sock-writer线程

从MessageWorker的queue里,取出消息TcpDiscoveryClientMetricsUpdateMessage、TcpDiscoveryCustomEventMessage,发送到服务端

tcp-client-disco-sock-reader线程

  1. 读取协调器(查阅)发的指标更新信息TcpDiscoveryClientMetricsUpdateMessage
  2. 接收服务端发送的TcpDiscoveryNodeAddedMessage和TcpDiscoveryNodeAddFinishedMessage,存入MessageWorker的queue里

tcp-discovery-exec线程

把TcpDiscoveryClientMetricsUpdateMessage信息加入到MessageWorker的queue里

代码位置:

代码语言:javascript复制
  private class MetricsSender implements Runnable {
	@Override public void run() {
	   ...
	   sockWriter.sendMessage(msg);
	}
}

ServerImpl源码分析

启动

CommandLineStartup类main方法为服务端启动方法,在启动过程中,startProcessor(new GridCacheProcessor(ctx))方法开启tcp-disco-srvr-线程, startManager(discoMgr)方法启动discovery manager,启动tcp-disco-msg-worker-线程。

tcp-disco-srvr线程

tcp-disco-srvr线程执行accept()方法等待接收客户端连接,另外在accept()后通过new SocketReader(sock)启动线程tcp-disco-sock-reader线程。

tcp-disco-sock-reader线程执行过程:

  1. 读取客户端发送的0x00004747请求
  2. 读取客户端发送的TcpDiscoveryHandshakeRequest,握手请求包
  3. 往客户端写TcpDiscoveryHandshakeResponse,握手响应包
  4. 启动tcp-disco-client-message-worker-线程
  5. 读取客户端发送TcpDiscoveryJoinRequest,做反序列化
  6. 第五步骤处理成功后返回ok给客户端
  7. 接收客户端TcpDiscoveryClientMetricsUpdateMessage
  8. 把TcpDiscoveryClientAckResponse添加到ClientMessageWorker里

tcp-disco-client-message-worker线程

  1. 从ClientMessageWorker取出数据,往客户端写TcpDiscoveryClientAckResponse
  2. 从ClientMessageWorker的queue中取数据,写TcpDiscoveryMetricsUpdateMessage、TcpDiscoveryNodeAddedMessage、TcpDiscoveryNodeAddFinishedMessage到客户端

tcp-disco-msg-worker线程

  1. 把TcpDiscoveryMetricsUpdateMessage添加到MessageWorker
  2. 对客户端的TcpDiscoveryJoinRequest包通过processJoinRequestMessage方法进行处理
  3. 第二步处理完得到TcpDiscoveryNodeAddedMessage,进入processNodeAddedMessage方法进行处理
  4. 第三步处理完得到TcpDiscoveryNodeAddFinishedMessage,进入processNodeAddFinishedMessage方法进行处理

2,3,4步中,会把消息存放在ClientMessageWorker的queue中。

0 人点赞