Elasticsearch源码分析二之Node节点创建与启动流程分析

2020-03-11 12:47:04 浏览数 (1)

Elasticsearch源码分析二之Node节点创建与启动流程分析

本篇主要以Node节点的创建与start流程为主,文中提到的其他具体环节的内容在后面会专门来分析。

紧接着昨天的Bootstrap的初始化来进行开篇,对应的是org.elasticsearch.bootstrap.Bootstrap#setup方法,详见代码片段:

代码语言:javascript复制
 //根据Node创建节点        node = new Node(environment) {            @Override            protected void validateNodeBeforeAcceptingRequests(                final BootstrapContext context,                final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {                BootstrapChecks.check(context, boundTransportAddress, checks);            }        };

下面针对Node的创建与启动进行分析。

Node构造

直接上构造方法代码:

代码语言:javascript复制
 public Node(Environment environment) {        this(environment, Collections.emptyList(), true);    }

由于具体的构造方法比较长,现在分成多个片段进行分析。

一、加载插件和环境配置

代码语言:javascript复制
 protected Node(            final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {            --------省略部分代码--------            Settings tmpSettings = Settings.builder().put(environment.settings())                .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();            //生成节点的环境对象,包括节点中文件lucene锁的初始化            nodeEnvironment = new NodeEnvironment(tmpSettings, environment);            resourcesToClose.add(nodeEnvironment);            ------------省略部分代码-------------            //主要用于加载插件的服务类实例            this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(),                environment.pluginsFile(), classpathPlugins);            final Settings settings = pluginsService.updatedSettings();            //节点的角色            final Set<DiscoveryNodeRole> possibleRoles = Stream.concat(                //public static Set<DiscoveryNodeRole> BUILT_IN_ROLES = Set.of(DATA_ROLE, INGEST_ROLE, MASTER_ROLE);                    DiscoveryNodeRole.BUILT_IN_ROLES.stream(),                    pluginsService.filterPlugins(Plugin.class)                            .stream()                            .map(Plugin::getRoles)                            .flatMap(Set::stream))                    .collect(Collectors.toSet());            DiscoveryNode.setPossibleRoles(possibleRoles);            //通过节点的配置和节点的nodeId创建本地节点工厂            localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

根据settings创建NodeEnvironment和pluginService实例。加载DiscoveryNodeRole列表信息,并创建LocalNodeFactory实例。主要是给Node实例的成员变量赋值。

二、线程池创建部分

代码片段如下:

代码语言:javascript复制
 // create the environment based on the finalized (processed) view of the settings            // this is just to makes sure that people get the same settings, no matter where they ask them from            this.environment = new Environment(settings, environment.configFile());            Environment.assertEquivalent(environment, this.environment);            //获取插件中的执行器builder            final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);            //线程池            final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));            resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));            // adds the context to the DeprecationLogger so that it does not need to be injected everywhere            DeprecationLogger.setThreadContext(threadPool.getThreadContext());            resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
            final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());            final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());            for (final ExecutorBuilder<?> builder : threadPool.builders()) {                additionalSettings.addAll(builder.getRegisteredSettings());            }            client = new NodeClient(settings, threadPool);
  • 通过pluginsService.getExecutorBuilders(settings)加载插件里的ExecutorBuilder列表。
  • 通过executorBuilders创建ThreadPool实例,这一部分由于比较复杂,后面用专门的文章来分析。
  • resourcesToClose为List类型的列表,里面添加的是Closeable类型的实例,方便在最好释放需要关闭的资源。
  • 初始化一些配置信息。
  • 创建NodeClient实例,赋值给Node实例的client属性。

三、创建和加载节点多个模块和服务

Elasticsearch节点中分工比较明确,不同工作是交给不同模块和对应的服务去处理的,我们直接来看Node构造方法接下来的部分代码片段:

代码语言:javascript复制
 // 用于监听资源文件变化的service            final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);            // 从插件服务中加载脚本模块            final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));            // 解析模块,主要是lucene索引、分词等操作            AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));            // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool            // so we might be late here already            // 用于升级的配置信息            final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class)                    .stream()                    .map(Plugin::getSettingUpgraders)                    .flatMap(List::stream)                    .collect(Collectors.toSet());            // 配置模块            final SettingsModule settingsModule =                    new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);            // 添加集群配置监听器         scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());            // 将resourceWatcherService添加到resourcesToClose列表中            resourcesToClose.add(resourceWatcherService);            // 从插件中加载网络服务            final NetworkService networkService = new NetworkService(                getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));            // 加载集群插件            List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);            // 创建集群服务            final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);            // 添加集群状态应用            clusterService.addStateApplier(scriptModule.getScriptService());            resourcesToClose.add(clusterService);            // 添加本地主节点监听器            clusterService.addLocalNodeMasterListener(                    new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings())                            .newHashPublisher());            // 实例化ingest 服务            final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,                scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),                pluginsService.filterPlugins(IngestPlugin.class), client);            // 创建集群信息服务            final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);            // 统计和查看节点使用情况的服务            final UsageService usageService = new UsageService();
            ModulesBuilder modules = new ModulesBuilder();            // 节点监控服务            final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);            // 集群模块            ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);            modules.add(clusterModule);            // 索引模块            IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));            modules.add(indicesModule);            // 搜索模块            SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));            // 断路器服务            CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),                settingsModule.getClusterSettings());            resourcesToClose.add(circuitBreakerService);            // 添加网关模块            modules.add(new GatewayModule());

详见代码注释,这段代码主要用于加载插件配置,创建脚本模块、解析模块、配置模块、集群模块、索引模块、搜索模块、网关模块等,以及节点管理和监控、断路器等多个服务。

继续往下看代码,还是继续加载一些基础服务:

代码语言:javascript复制
 // page 缓存回收器            PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);            // 创建大数组,用于下文中的持久化服务            BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);            // 添加配置模块            modules.add(settingsModule);            // 聚集各个模块的特性信息实体,比如ClusterModule中的snapshots、restore等            List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(                NetworkModule.getNamedWriteables().stream(),                IndicesModule.getNamedWriteables().stream(),                searchModule.getNamedWriteables().stream(),                pluginsService.filterPlugins(Plugin.class).stream()                    .flatMap(p -> p.getNamedWriteables().stream()),                ClusterModule.getNamedWriteables().stream())                .flatMap(Function.identity()).collect(Collectors.toList());            // 通过namedWriteables创建NamedWriteableRegistry            final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);            // 加载各模块的namedXContents创建NamedXContentRegistry            NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(                NetworkModule.getNamedXContents().stream(),                IndicesModule.getNamedXContents().stream(),                searchModule.getNamedXContents().stream(),                pluginsService.filterPlugins(Plugin.class).stream()                    .flatMap(p -> p.getNamedXContent().stream()),                ClusterModule.getNamedXWriteables().stream())                .flatMap(Function.identity()).collect(toList()));            // 元数据状态服务            final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);            // 用于持久化集群状态的服务            final PersistedClusterStateService lucenePersistedStateFactory                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(),                threadPool::relativeTimeInMillis);

主要加载元数据状态服务和持久化集群状态服务,主要为下面的服务创建做准备。

继续往下看代码,代码片段如下:

代码语言:javascript复制
 // collect engine factory providers from server and from plugins            // 从服务端和插件列表中收集引擎工厂providers            final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);            // 生成引擎提供者工厂列表            final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =                    Stream.concat(                            indicesModule.getEngineFactories().stream(),                            enginePlugins.stream().map(plugin -> plugin::getEngineFactory))                    .collect(Collectors.toList());
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* // 从插件中过滤出索引存储工厂            final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =                    pluginsService.filterPlugins(IndexStorePlugin.class)                            .stream()                            .map(IndexStorePlugin::getDirectoryFactories)                            .flatMap(m -> m.entrySet().stream())                            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));            //系统索引描述符map            final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService                .filterPlugins(SystemIndexPlugin.class)                .stream()                .collect(Collectors.toUnmodifiableMap(                    plugin -> plugin.getClass().getSimpleName(),                    plugin -> plugin.getSystemIndexDescriptors()));            SystemIndexDescriptor.checkForOverlappingPatterns(systemIndexDescriptorMap);            // map 转成list            final List<SystemIndexDescriptor> systemIndexDescriptors = systemIndexDescriptorMap.values().stream()                .flatMap(Collection::stream)                .collect(Collectors.toList());            // 索引服务            final IndicesService indicesService =                new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),                    clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,                    threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),                    clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories);            // 别名校验器            final AliasValidator aliasValidator = new AliasValidator();            // 元数据索引创建服务            final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(                    settings,                    clusterService,                    indicesService,                    clusterModule.getAllocationService(),                    aliasValidator,                    environment,                    settingsModule.getIndexScopedSettings(),                    threadPool,                    xContentRegistry,                    systemIndexDescriptors,                    forbidPrivateIndexSettings);            // 插件组件            Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()                .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,                                                 scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,                                                 namedWriteableRegistry).stream())                .collect(Collectors.toList());            // action 模块            ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, clusterService);            modules.add(actionModule);            // 从action 模块中获取restController            final RestController restController = actionModule.getRestController();            // 网络模块            final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,                networkService, restController, clusterService.getClusterSettings());            Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =                pluginsService.filterPlugins(Plugin.class).stream()                    .map(Plugin::getIndexTemplateMetaDataUpgrader)                    .collect(Collectors.toList());            final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(indexTemplateMetaDataUpgraders);            // 元数据索引升级服务            final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,                indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings());            new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetaDataUpgraders);            // 底层传输对象            final Transport transport = networkModule.getTransportSupplier().get();            Set<String> taskHeaders = Stream.concat(                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),                Stream.of(Task.X_OPAQUE_ID)            ).collect(Collectors.toSet());            // 传输服务            final TransportService transportService = newTransportService(settings, transport, threadPool,                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);            // 网关元数据状态            final GatewayMetaState gatewayMetaState = new GatewayMetaState();            // 响应归集服务            final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);            // 搜索传输服务            final SearchTransportService searchTransportService =  new SearchTransportService(transportService,                SearchExecutionStatsCollector.makeWrapper(responseCollectorService));            // http请求传输实例            final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
*/

主要用于从插件和服务中加载多个模块和进行服务的创建,如索引、网络传输、搜索、http服务等,具体可以参考代码注释来看。

继续分析接下来的代码,这里直接继续上代码:

代码语言:javascript复制
 // 仓库模块            RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,                pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry);            // 仓库服务            RepositoriesService repositoryService = repositoriesModule.getRepositoryService();            // 快照服务            SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,                clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool);            // 分片快照服务            SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,                threadPool, transportService, indicesService, actionModule.getActionFilters(),                clusterModule.getIndexNameExpressionResolver());            // 数据、索引的恢复服务            RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),                metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());            // 重新路由服务            final RerouteService rerouteService                = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);            // 磁盘临界点监控器            final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,                clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);            clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);            // 服务发现模块            final DiscoveryModule discoveryModule = new DiscoveryModule(settings, transportService, namedWriteableRegistry,                networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),                clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),                clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService);            // 节点服务            this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),                transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),                httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,                searchTransportService);            // 搜索服务            final SearchService searchService = newSearchService(clusterService, indicesService,                threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),                responseCollectorService, circuitBreakerService);            // 持久化任务执行器            final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService                .filterPlugins(PersistentTaskPlugin.class).stream()                .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))                .flatMap(List::stream)                .collect(toList());            // 持久化任务执行器注册            final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);            // 持久化任务集群服务            final PersistentTasksClusterService persistentTasksClusterService =                new PersistentTasksClusterService(settings, registry, clusterService, threadPool);            resourcesToClose.add(persistentTasksClusterService);            // 持久化任务服务            final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);

这里仍然还是创建一些基础服务设施,如存储、快照、路由、恢复、搜索等服务。

四、Elasticsearch的IOC部分

继续上面的代码逻辑往下走,就到了Elasticsearch中十分重要的部分,相当于IOC部分,我们直接上代码:

代码语言:javascript复制
  // 这里是elasticsearch 自己的IOC管理机制            // 使用lambda创建了一个Module对象,并执行configure(Binder binder)方法,在方法里面执行inject逻辑            modules.add(b -> {                    b.bind(Node.class).toInstance(this);                    b.bind(NodeService.class).toInstance(nodeService);                    b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);                    b.bind(PluginsService.class).toInstance(pluginsService);                    b.bind(Client.class).toInstance(client);                    b.bind(NodeClient.class).toInstance(client);                    b.bind(Environment.class).toInstance(this.environment);                    b.bind(ThreadPool.class).toInstance(threadPool);                    b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);                    b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);                    b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);                    b.bind(BigArrays.class).toInstance(bigArrays);                    b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);                    b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());                    b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());                    b.bind(IngestService.class).toInstance(ingestService);                    b.bind(UsageService.class).toInstance(usageService);                    b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);                    b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);                    b.bind(MetaStateService.class).toInstance(metaStateService);                    b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory);                    b.bind(IndicesService.class).toInstance(indicesService);                    b.bind(AliasValidator.class).toInstance(aliasValidator);                    b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService);                    b.bind(SearchService.class).toInstance(searchService);                    b.bind(SearchTransportService.class).toInstance(searchTransportService);                    b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::createReduceContext));                    b.bind(Transport.class).toInstance(transport);                    b.bind(TransportService.class).toInstance(transportService);                    b.bind(NetworkService.class).toInstance(networkService);                    b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService()));                    b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);                    b.bind(ClusterInfoService.class).toInstance(clusterInfoService);                    b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);                    b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());                    {                        RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());                        processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);                        b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,                                indicesService, recoverySettings));                        b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,                                transportService, recoverySettings, clusterService));                    }                    b.bind(HttpServerTransport.class).toInstance(httpServerTransport);                    pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));                    b.bind(PersistentTasksService.class).toInstance(persistentTasksService);                    b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);                    b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);                    b.bind(RepositoriesService.class).toInstance(repositoryService);                    b.bind(SnapshotsService.class).toInstance(snapshotsService);                    b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);                    b.bind(RestoreService.class).toInstance(restoreService);                    b.bind(RerouteService.class).toInstance(rerouteService);                }            );

这里使用lambda创建了一个Module对象,并执行configure(Binder binder)方法,在方法里面执行inject逻辑,通过bind操作将上面流程中创建的各种服务都放到一个容器里去。

具体的实例注入是通过Elasticsearch自己实现的Guice机制,其中google也有一套Guice机制用于IOC管理。我们接着往下看代码:

代码语言:javascript复制
 // 创建injector,之后可以从IOC容器中获取上面注入的实例对象            injector = modules.createInjector();
            // TODO hack around circular dependencies problems in AllocationService            clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));            // 插件生命周期组件            List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()                .filter(p -> p instanceof LifecycleComponent)                .map(p -> (LifecycleComponent) p).collect(Collectors.toList());            resourcesToClose.addAll(pluginLifecycleComponents);            resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));            this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);            // 初始化client            client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),                    () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());            this.namedWriteableRegistry = namedWriteableRegistry;
            logger.debug("initializing HTTP handlers ...");            actionModule.initRestHandlers(() -> clusterService.state().nodes());

这里创建injector,之后可以从IOC容器中获取上面注入的实例对象进行client和actionModule的初始化。

到这里Node节点的创建就已经完成了,接下来我们继续看下Node节点的启动流程。

Node节点的启动

节点的启动是由start方法来进行的,org.elasticsearch.node.Node#start方法的代码:

代码语言:javascript复制
 /**     * Start the node. If the node is already started, this method is no-op.     */    public Node start() throws NodeValidationException {        if (!lifecycle.moveToStarted()) {            return this;        }
        logger.info("starting ...");        // 启动生命周期组件        pluginLifecycleComponents.forEach(LifecycleComponent::start);        // 从IOC容器中获取MappingUpdatedAction实例并设置client属性        injector.getInstance(MappingUpdatedAction.class).setClient(client);        // 从IOC容器中获取索引服务并启动        injector.getInstance(IndicesService.class).start();        // 从IOC容器中获取索引集群状态服务并启动,底层是通过线程池来定时处理        injector.getInstance(IndicesClusterStateService.class).start();        // 从IOC容器中获取快照服务并启动        injector.getInstance(SnapshotsService.class).start();        // 从IOC容器中获取分片快照服务并启动        injector.getInstance(SnapshotShardsService.class).start();        // 从IOC容器中获取仓库服务并启动        injector.getInstance(RepositoriesService.class).start();        // 从IOC容器中获取搜索服务并启动        injector.getInstance(SearchService.class).start();        // 启动监控服务        nodeService.getMonitorService().start();        // 从IOC容器中获取集群服务实例        final ClusterService clusterService = injector.getInstance(ClusterService.class);        // 从容器中获取节点连接服务        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);        nodeConnectionsService.start();        clusterService.setNodeConnectionsService(nodeConnectionsService);        // 启动资源监控服务        injector.getInstance(ResourceWatcherService.class).start();        // 启动网关服务        injector.getInstance(GatewayService.class).start();        Discovery discovery = injector.getInstance(Discovery.class);        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
        // Start the transport service now so the publish address will be added to the local disco node in ClusterService        TransportService transportService = injector.getInstance(TransportService.class);        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));        // 启动网络传输服务        transportService.start();        assert localNodeFactory.getNode() != null;        assert transportService.getLocalNode().equals(localNodeFactory.getNode())            : "transportService has a different local node than the factory provided";        injector.getInstance(PeerRecoverySourceService.class).start();
        // Load (and maybe upgrade) the metadata stored on disk        final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);        // 启动网关元数据状态服务        gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),            injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class),            injector.getInstance(PersistedClusterStateService.class));        if (Assertions.ENABLED) {            ---------省略部分代码--------        }          // we load the global state here (the persistent part of the cluster state stored on disk) to        // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.        // 加载磁盘上持久化的集群状态        final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData();        assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null        // 校验节点状态信息        validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),            pluginsService.filterPlugins(Plugin.class).stream()                .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));        // 添加集群状态管理        clusterService.addStateApplier(transportService.getTaskManager());        // start after transport service so the local disco is known        // 启动服务发现        discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService        // 启动集群状态服务        clusterService.start();        assert clusterService.localNode().equals(localNodeFactory.getNode())            : "clusterService has a different local node than the factory provided";        transportService.acceptIncomingRequests();        discovery.startInitialJoin();        final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());        configureNodeAndClusterIdStateListener(clusterService);
        if (initialStateTimeout.millis() > 0) {            final ThreadPool thread = injector.getInstance(ThreadPool.class);            ClusterState clusterState = clusterService.state();            // 观察者            ClusterStateObserver observer =                new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());            //当没有主节点的时候            if (clusterState.nodes().getMasterNodeId() == null) {                logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);                // 闭锁 让集群状态初始化同步完成之后才进行之后的操作                final CountDownLatch latch = new CountDownLatch(1);                observer.waitForNextChange(new ClusterStateObserver.Listener() {                    @Override                    public void onNewClusterState(ClusterState state) { latch.countDown(); }
                    @Override                    public void onClusterServiceClose() {                        latch.countDown();                    }
                    @Override                    public void onTimeout(TimeValue timeout) {                        logger.warn("timed out while waiting for initial discovery state - timeout: {}",                            initialStateTimeout);                        latch.countDown();                    }                }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
                try {                    // 等待直到集群状态同步完成才继续往下走                    latch.await();                } catch (InterruptedException e) {                    throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");                }            }        }        // 启动httpServer服务        injector.getInstance(HttpServerTransport.class).start();        //是否写ports文件        if (WRITE_PORTS_FILE_SETTING.get(settings())) {            TransportService transport = injector.getInstance(TransportService.class);            // 写ports文件            writePortsFile("transport", transport.boundAddress());            HttpServerTransport http = injector.getInstance(HttpServerTransport.class);            writePortsFile("http", http.boundAddress());        }
        logger.info("started");
        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
        return this;    }

具体代码细节可以参考下代码注释,这里主要是启动从Elasticsearch的IOC容器中获取在创建Node节点时初始化的一系列基础服务,具体启动细节涉及到的内容比较多,后面再具体分析。

到这里关于Node节点的创建和启动流程就梳理完了。注意,这里只涉及流程分析,对具体的细节在后面再详细介绍。

0 人点赞