客户端加入集群过程源码分析
-
- ClientImpl源码分析
-
- 启动
- tcp-client-disco-msg-worker线程
- tcp-client-disco-sock-writer线程
- tcp-client-disco-sock-reader线程
- tcp-discovery-exec线程
- ServerImpl源码分析
-
- 启动
- tcp-disco-srvr线程
- tcp-disco-client-message-worker线程
- tcp-disco-msg-worker线程
本文分析ignite 客户端加入集群过程中重要的源码内容,原理可查阅ignite节点发现原理及源码分析
ClientImpl源码分析
启动
客户端通过以下代码启动
代码语言:javascript复制 IgniteConfiguration cfg = new IgniteConfiguration();
...
cfg.setClientMode(true);
Ignite ignite = Ignition.start(cfg);
在启动过程中,会启动一个 GridIoManager管理器,此管理器会收集客户端本地信息,如hostname、ip、jdk环境等等。
在启动GridIoManager之后,会启动discovery manager管理器,此管理器会启动tcp-client-disco-msg-worker-、tcp-client-disco-sock-writer-、tcp-client-disco-sock-reader-、tcp-discovery-exec-这些线程
tcp-client-disco-msg-worker线程
主要2个方法,tryJoin()和processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg)
tryJoin方法:
- openSocket,与服务端建立连接,发0x00004747到服务端(类似一个hello包)
- 发送TcpDiscoveryHandshakeRequest包,类似握手请求包
- 读取服务端发送的TcpDiscoveryHandshakeResponse包,即握手响应包
- 发送TcpDiscoveryJoinRequestMessage包,节点加入请求包,此包包含加入节点的信息
- 接收服务端处理完TcpDiscoveryJoinRequest后发送的ok包
processDiscoveryMessage方法:
客户端从MessageWorker的queue队列里取出服务端发送的TcpDiscoveryNodeAddedMessage和TcpDiscoveryNodeAddFinishedMessage进行处理
tcp-client-disco-sock-writer线程
从MessageWorker的queue里,取出消息TcpDiscoveryClientMetricsUpdateMessage、TcpDiscoveryCustomEventMessage,发送到服务端
tcp-client-disco-sock-reader线程
- 读取协调器(查阅)发的指标更新信息TcpDiscoveryClientMetricsUpdateMessage
- 接收服务端发送的TcpDiscoveryNodeAddedMessage和TcpDiscoveryNodeAddFinishedMessage,存入MessageWorker的queue里
tcp-discovery-exec线程
把TcpDiscoveryClientMetricsUpdateMessage信息加入到MessageWorker的queue里
代码位置:
代码语言:javascript复制 private class MetricsSender implements Runnable {
@Override public void run() {
...
sockWriter.sendMessage(msg);
}
}
ServerImpl源码分析
启动
CommandLineStartup类main方法为服务端启动方法,在启动过程中,startProcessor(new GridCacheProcessor(ctx))方法开启tcp-disco-srvr-线程, startManager(discoMgr)方法启动discovery manager,启动tcp-disco-msg-worker-线程。
tcp-disco-srvr线程
tcp-disco-srvr线程执行accept()方法等待接收客户端连接,另外在accept()后通过new SocketReader(sock)启动线程tcp-disco-sock-reader线程。
tcp-disco-sock-reader线程执行过程:
- 读取客户端发送的0x00004747请求
- 读取客户端发送的TcpDiscoveryHandshakeRequest,握手请求包
- 往客户端写TcpDiscoveryHandshakeResponse,握手响应包
- 启动tcp-disco-client-message-worker-线程
- 读取客户端发送TcpDiscoveryJoinRequest,做反序列化
- 第五步骤处理成功后返回ok给客户端
- 接收客户端TcpDiscoveryClientMetricsUpdateMessage
- 把TcpDiscoveryClientAckResponse添加到ClientMessageWorker里
tcp-disco-client-message-worker线程
- 从ClientMessageWorker取出数据,往客户端写TcpDiscoveryClientAckResponse
- 从ClientMessageWorker的queue中取数据,写TcpDiscoveryMetricsUpdateMessage、TcpDiscoveryNodeAddedMessage、TcpDiscoveryNodeAddFinishedMessage到客户端
tcp-disco-msg-worker线程
- 把TcpDiscoveryMetricsUpdateMessage添加到MessageWorker
- 对客户端的TcpDiscoveryJoinRequest包通过processJoinRequestMessage方法进行处理
- 第二步处理完得到TcpDiscoveryNodeAddedMessage,进入processNodeAddedMessage方法进行处理
- 第三步处理完得到TcpDiscoveryNodeAddFinishedMessage,进入processNodeAddFinishedMessage方法进行处理
2,3,4步中,会把消息存放在ClientMessageWorker的queue中。