Elasticsearch源码分析二之Node节点创建与启动流程分析
本篇主要以Node节点的创建与start流程为主,文中提到的其他具体环节的内容在后面会专门来分析。
紧接着昨天的Bootstrap的初始化来进行开篇,对应的是org.elasticsearch.bootstrap.Bootstrap#setup方法,详见代码片段:
代码语言:javascript复制 //根据Node创建节点 node = new Node(environment) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } };
下面针对Node的创建与启动进行分析。
Node构造
直接上构造方法代码:
代码语言:javascript复制 public Node(Environment environment) { this(environment, Collections.emptyList(), true); }
由于具体的构造方法比较长,现在分成多个片段进行分析。
一、加载插件和环境配置
代码语言:javascript复制 protected Node( final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) { --------省略部分代码-------- Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); //生成节点的环境对象,包括节点中文件lucene锁的初始化 nodeEnvironment = new NodeEnvironment(tmpSettings, environment); resourcesToClose.add(nodeEnvironment); ------------省略部分代码------------- //主要用于加载插件的服务类实例 this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins); final Settings settings = pluginsService.updatedSettings(); //节点的角色 final Set<DiscoveryNodeRole> possibleRoles = Stream.concat( //public static Set<DiscoveryNodeRole> BUILT_IN_ROLES = Set.of(DATA_ROLE, INGEST_ROLE, MASTER_ROLE); DiscoveryNodeRole.BUILT_IN_ROLES.stream(), pluginsService.filterPlugins(Plugin.class) .stream() .map(Plugin::getRoles) .flatMap(Set::stream)) .collect(Collectors.toSet()); DiscoveryNode.setPossibleRoles(possibleRoles); //通过节点的配置和节点的nodeId创建本地节点工厂 localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
根据settings创建NodeEnvironment和pluginService实例。加载DiscoveryNodeRole列表信息,并创建LocalNodeFactory实例。主要是给Node实例的成员变量赋值。
二、线程池创建部分
代码片段如下:
代码语言:javascript复制 // create the environment based on the finalized (processed) view of the settings // this is just to makes sure that people get the same settings, no matter where they ask them from this.environment = new Environment(settings, environment.configFile()); Environment.assertEquivalent(environment, this.environment); //获取插件中的执行器builder final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings); //线程池 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); // adds the context to the DeprecationLogger so that it does not need to be injected everywhere DeprecationLogger.setThreadContext(threadPool.getThreadContext()); resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings()); final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter()); for (final ExecutorBuilder<?> builder : threadPool.builders()) { additionalSettings.addAll(builder.getRegisteredSettings()); } client = new NodeClient(settings, threadPool);
- 通过pluginsService.getExecutorBuilders(settings)加载插件里的ExecutorBuilder列表。
- 通过executorBuilders创建ThreadPool实例,这一部分由于比较复杂,后面用专门的文章来分析。
- resourcesToClose为List类型的列表,里面添加的是Closeable类型的实例,方便在最好释放需要关闭的资源。
- 初始化一些配置信息。
- 创建NodeClient实例,赋值给Node实例的client属性。
三、创建和加载节点多个模块和服务
Elasticsearch节点中分工比较明确,不同工作是交给不同模块和对应的服务去处理的,我们直接来看Node构造方法接下来的部分代码片段:
代码语言:javascript复制 // 用于监听资源文件变化的service final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); // 从插件服务中加载脚本模块 final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class)); // 解析模块,主要是lucene索引、分词等操作 AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class)); // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool // so we might be late here already // 用于升级的配置信息 final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class) .stream() .map(Plugin::getSettingUpgraders) .flatMap(List::stream) .collect(Collectors.toSet()); // 配置模块 final SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders); // 添加集群配置监听器 scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings()); // 将resourceWatcherService添加到resourcesToClose列表中 resourcesToClose.add(resourceWatcherService); // 从插件中加载网络服务 final NetworkService networkService = new NetworkService( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))); // 加载集群插件 List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); // 创建集群服务 final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); // 添加集群状态应用 clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); // 添加本地主节点监听器 clusterService.addLocalNodeMasterListener( new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings()) .newHashPublisher()); // 实例化ingest 服务 final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class), client); // 创建集群信息服务 final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); // 统计和查看节点使用情况的服务 final UsageService usageService = new UsageService();
ModulesBuilder modules = new ModulesBuilder(); // 节点监控服务 final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); // 集群模块 ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); // 索引模块 IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); // 搜索模块 SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class)); // 断路器服务 CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); // 添加网关模块 modules.add(new GatewayModule());
详见代码注释,这段代码主要用于加载插件配置,创建脚本模块、解析模块、配置模块、集群模块、索引模块、搜索模块、网关模块等,以及节点管理和监控、断路器等多个服务。
继续往下看代码,还是继续加载一些基础服务:
代码语言:javascript复制 // page 缓存回收器 PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); // 创建大数组,用于下文中的持久化服务 BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); // 添加配置模块 modules.add(settingsModule); // 聚集各个模块的特性信息实体,比如ClusterModule中的snapshots、restore等 List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of( NetworkModule.getNamedWriteables().stream(), IndicesModule.getNamedWriteables().stream(), searchModule.getNamedWriteables().stream(), pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.getNamedWriteables().stream()), ClusterModule.getNamedWriteables().stream()) .flatMap(Function.identity()).collect(Collectors.toList()); // 通过namedWriteables创建NamedWriteableRegistry final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); // 加载各模块的namedXContents创建NamedXContentRegistry NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of( NetworkModule.getNamedXContents().stream(), IndicesModule.getNamedXContents().stream(), searchModule.getNamedXContents().stream(), pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.getNamedXContent().stream()), ClusterModule.getNamedXWriteables().stream()) .flatMap(Function.identity()).collect(toList())); // 元数据状态服务 final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); // 用于持久化集群状态的服务 final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(), threadPool::relativeTimeInMillis);
主要加载元数据状态服务和持久化集群状态服务,主要为下面的服务创建做准备。
继续往下看代码,代码片段如下:
代码语言:javascript复制 // collect engine factory providers from server and from plugins // 从服务端和插件列表中收集引擎工厂providers final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); // 生成引擎提供者工厂列表 final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders = Stream.concat( indicesModule.getEngineFactories().stream(), enginePlugins.stream().map(plugin -> plugin::getEngineFactory)) .collect(Collectors.toList());
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释
* // 从插件中过滤出索引存储工厂 final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = pluginsService.filterPlugins(IndexStorePlugin.class) .stream() .map(IndexStorePlugin::getDirectoryFactories) .flatMap(m -> m.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); //系统索引描述符map final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService .filterPlugins(SystemIndexPlugin.class) .stream() .collect(Collectors.toUnmodifiableMap( plugin -> plugin.getClass().getSimpleName(), plugin -> plugin.getSystemIndexDescriptors())); SystemIndexDescriptor.checkForOverlappingPatterns(systemIndexDescriptorMap); // map 转成list final List<SystemIndexDescriptor> systemIndexDescriptors = systemIndexDescriptorMap.values().stream() .flatMap(Collection::stream) .collect(Collectors.toList()); // 索引服务 final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(), clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry, threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(), clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories); // 别名校验器 final AliasValidator aliasValidator = new AliasValidator(); // 元数据索引创建服务 final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService( settings, clusterService, indicesService, clusterModule.getAllocationService(), aliasValidator, environment, settingsModule.getIndexScopedSettings(), threadPool, xContentRegistry, systemIndexDescriptors, forbidPrivateIndexSettings); // 插件组件 Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry).stream()) .collect(Collectors.toList()); // action 模块 ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, clusterService); modules.add(actionModule); // 从action 模块中获取restController final RestController restController = actionModule.getRestController(); // 网络模块 final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController, clusterService.getClusterSettings()); Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexTemplateMetaDataUpgrader) .collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(indexTemplateMetaDataUpgraders); // 元数据索引升级服务 final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry, indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()); new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetaDataUpgraders); // 底层传输对象 final Transport transport = networkModule.getTransportSupplier().get(); Set<String> taskHeaders = Stream.concat( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); // 传输服务 final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); // 网关元数据状态 final GatewayMetaState gatewayMetaState = new GatewayMetaState(); // 响应归集服务 final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); // 搜索传输服务 final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); // http请求传输实例 final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
*/
主要用于从插件和服务中加载多个模块和进行服务的创建,如索引、网络传输、搜索、http服务等,具体可以参考代码注释来看。
继续分析接下来的代码,这里直接继续上代码:
代码语言:javascript复制 // 仓库模块 RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry); // 仓库服务 RepositoriesService repositoryService = repositoriesModule.getRepositoryService(); // 快照服务 SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService, clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool); // 分片快照服务 SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService, threadPool, transportService, indicesService, actionModule.getActionFilters(), clusterModule.getIndexNameExpressionResolver()); // 数据、索引的恢复服务 RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings()); // 重新路由服务 final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); // 磁盘临界点监控器 final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); // 服务发现模块 final DiscoveryModule discoveryModule = new DiscoveryModule(settings, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService); // 节点服务 this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService); // 搜索服务 final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), responseCollectorService, circuitBreakerService); // 持久化任务执行器 final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule)) .flatMap(List::stream) .collect(toList()); // 持久化任务执行器注册 final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors); // 持久化任务集群服务 final PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); // 持久化任务服务 final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
这里仍然还是创建一些基础服务设施,如存储、快照、路由、恢复、搜索等服务。
四、Elasticsearch的IOC部分
继续上面的代码逻辑往下走,就到了Elasticsearch中十分重要的部分,相当于IOC部分,我们直接上代码:
代码语言:javascript复制 // 这里是elasticsearch 自己的IOC管理机制 // 使用lambda创建了一个Module对象,并执行configure(Binder binder)方法,在方法里面执行inject逻辑 modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(BigArrays.class).toInstance(bigArrays); b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler); b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); b.bind(UsageService.class).toInstance(usageService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory); b.bind(IndicesService.class).toInstance(indicesService); b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService); b.bind(SearchService.class).toInstance(searchService); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::createReduceContext)); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService())); b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); b.bind(PersistentTasksService.class).toInstance(persistentTasksService); b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService); b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry); b.bind(RepositoriesService.class).toInstance(repositoryService); b.bind(SnapshotsService.class).toInstance(snapshotsService); b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RerouteService.class).toInstance(rerouteService); } );
这里使用lambda创建了一个Module对象,并执行configure(Binder binder)方法,在方法里面执行inject逻辑,通过bind操作将上面流程中创建的各种服务都放到一个容器里去。
具体的实例注入是通过Elasticsearch自己实现的Guice机制,其中google也有一套Guice机制用于IOC管理。我们接着往下看代码:
代码语言:javascript复制 // 创建injector,之后可以从IOC容器中获取上面注入的实例对象 injector = modules.createInjector();
// TODO hack around circular dependencies problems in AllocationService clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class)); // 插件生命周期组件 List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); resourcesToClose.addAll(pluginLifecycleComponents); resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); // 初始化client client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); this.namedWriteableRegistry = namedWriteableRegistry;
logger.debug("initializing HTTP handlers ..."); actionModule.initRestHandlers(() -> clusterService.state().nodes());
这里创建injector,之后可以从IOC容器中获取上面注入的实例对象进行client和actionModule的初始化。
到这里Node节点的创建就已经完成了,接下来我们继续看下Node节点的启动流程。
Node节点的启动
节点的启动是由start方法来进行的,org.elasticsearch.node.Node#start方法的代码:
代码语言:javascript复制 /** * Start the node. If the node is already started, this method is no-op. */ public Node start() throws NodeValidationException { if (!lifecycle.moveToStarted()) { return this; }
logger.info("starting ..."); // 启动生命周期组件 pluginLifecycleComponents.forEach(LifecycleComponent::start); // 从IOC容器中获取MappingUpdatedAction实例并设置client属性 injector.getInstance(MappingUpdatedAction.class).setClient(client); // 从IOC容器中获取索引服务并启动 injector.getInstance(IndicesService.class).start(); // 从IOC容器中获取索引集群状态服务并启动,底层是通过线程池来定时处理 injector.getInstance(IndicesClusterStateService.class).start(); // 从IOC容器中获取快照服务并启动 injector.getInstance(SnapshotsService.class).start(); // 从IOC容器中获取分片快照服务并启动 injector.getInstance(SnapshotShardsService.class).start(); // 从IOC容器中获取仓库服务并启动 injector.getInstance(RepositoriesService.class).start(); // 从IOC容器中获取搜索服务并启动 injector.getInstance(SearchService.class).start(); // 启动监控服务 nodeService.getMonitorService().start(); // 从IOC容器中获取集群服务实例 final ClusterService clusterService = injector.getInstance(ClusterService.class); // 从容器中获取节点连接服务 final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); // 启动资源监控服务 injector.getInstance(ResourceWatcherService.class).start(); // 启动网关服务 injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class)); // 启动网络传输服务 transportService.start(); assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; injector.getInstance(PeerRecoverySourceService.class).start();
// Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); // 启动网关元数据状态服务 gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class), injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class), injector.getInstance(PersistedClusterStateService.class)); if (Assertions.ENABLED) { ---------省略部分代码-------- } // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. // 加载磁盘上持久化的集群状态 final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData(); assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null // 校验节点状态信息 validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList())); // 添加集群状态管理 clusterService.addStateApplier(transportService.getTaskManager()); // start after transport service so the local disco is known // 启动服务发现 discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService // 启动集群状态服务 clusterService.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); discovery.startInitialJoin(); final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService);
if (initialStateTimeout.millis() > 0) { final ThreadPool thread = injector.getInstance(ThreadPool.class); ClusterState clusterState = clusterService.state(); // 观察者 ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext()); //当没有主节点的时候 if (clusterState.nodes().getMasterNodeId() == null) { logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout); // 闭锁 让集群状态初始化同步完成之后才进行之后的操作 final CountDownLatch latch = new CountDownLatch(1); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { latch.countDown(); }
@Override public void onClusterServiceClose() { latch.countDown(); }
@Override public void onTimeout(TimeValue timeout) { logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout); latch.countDown(); } }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try { // 等待直到集群状态同步完成才继续往下走 latch.await(); } catch (InterruptedException e) { throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); } } } // 启动httpServer服务 injector.getInstance(HttpServerTransport.class).start(); //是否写ports文件 if (WRITE_PORTS_FILE_SETTING.get(settings())) { TransportService transport = injector.getInstance(TransportService.class); // 写ports文件 writePortsFile("transport", transport.boundAddress()); HttpServerTransport http = injector.getInstance(HttpServerTransport.class); writePortsFile("http", http.boundAddress()); }
logger.info("started");
pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
return this; }
具体代码细节可以参考下代码注释,这里主要是启动从Elasticsearch的IOC容器中获取在创建Node节点时初始化的一系列基础服务,具体启动细节涉及到的内容比较多,后面再具体分析。
到这里关于Node节点的创建和启动流程就梳理完了。注意,这里只涉及流程分析,对具体的细节在后面再详细介绍。