《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析

2021-08-19 18:02:26 浏览数 (1)

简介

gateway 模块负责集群元信息的存储和集群重启时的恢复。

元数据

ES 中存储的数据有以下几种:

  • state 元数据信息
  • index Lucene 生成的索引文件
  • translog 事务日志
代码语言:javascript复制
translog:/Data/apps/search/datas/nodes/0/indices/2A1sfea4RQmGiN3duyF5xw/0/translog/translog-1.tlog

元数据信息又有以下几种:

  • nodes/0/_state/*.st 集群层面元信息;
  • nodes/0/indices/{index_ uuid}I_state/*.st 索引层面元信息
  • nodes/0/indices/{index_uuid}/0/_state/*.st ,分片层面元信息。
代码语言:javascript复制
集群层面:/Data/apps/search/datas/nodes/0/_state/global-2231.st  node-2.st
索引层面:/Data/apps/search/datas/nodes/0/indices/2A1sfea4RQmGiN3duyF5xw/_state/state-1.st
分片层面:/Data/apps/search/datas/nodes/0/indices/2A1sfea4RQmGiN3duyF5xw/0/_state/state-1.st

分别对应 ES 中的数据结构:

  • MetaData(集群层)主要是 clusterUUID、settings、templates 等
  • lndexMetaData(索引层)主要是 numberOfShards、mappings 等
  • ShardStateMetaData(分片层)主要是 version、indexUUID、primary 等

上述信息被持久化到磁盘,需要注意的是:持久化的 state 不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠gateway的recovery过程重建RoutingTable。 当读取某个文档时,根据路由算法确定目的分片后,从RoutingTable中查找分片位于哪个节点,然后将请求转发到目的节点。

元数据的持久化

只有具备Master资格的节点和数据节点可以持久化集群状态。当收到主节点发布的集群状态时,节点判断元信息是否发生变化,如果发生变化,则将其持久化到磁盘中。

GatewayMetaState类负责接收集群状态,它继承自ClusterStateApplier, 并实现其中的applyClusterState方法,当收到新的集群状态时,ClusterApplierService 通知全部applier 应用该集群状态:

代码语言:javascript复制
private void callClusterStateAppliers (ClusterChangedEvent clusterChangedEvent) {
    //遍历全部的applier,依次调用各模块对集群状态的处理
    clusterStateAppliers.forEach (applier -> {
        try {
            //调用各模块实现的applyClusterState
            applier. applyClusterState (clusterChangedEvent);
        } catch (Exception ex) {
            //某个模块应用集群状态出现异常时打印日志,但应用过程不会终止
            logger .warn("failed to notify ClusterStateApplier", ex);
        });
}

集群状态的发布应用的详细过程请参考 Cluster 模块分。

节点校验本身资格,判断元信是否发生变化,并将其持久化到磁盘中,全局元信息和索引级元信息都来自集群状态。

代码语言:java复制
public void appl applyClusterState(ClusterChangedEvent event) {
    //只有具备Master资格的节点和数据节点才会持久化元数据
    if (state.nodes().getLocalNode().isMasterNode() || state.nodes().getLocalNode().isDataNode()) {
        //检查全局元信息是否发生变化,如果有变化,则将其写入磁盘
        if (previousMetaData == nulll || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) {
            metaStateService.writeGlobalState ("changed", newMetaData);
        }
        //将发生变化的元信息写磁盘
        for (IndexMetaWriteinfo indexMetaWrite : writeinfo) { 
            metaStateServ.writeindex(indexMetaWrite.reason, indexMetaWrite.newMetaData);
        }
    }
}

执行文件写入的过程封装在MetaDataStateFormat类中,全局元信息和索引级元信息的写入都执行三个流程:写临时文件、刷盘、“move”成目标文件。

代码语言:javascript复制
//临时文件名
final Path tmpStatePath = stateLocation. resolve (fileName   ". tmp");

//目标文件名
final Path finalStatePath = stateLocation. resolve (fileName);

//写临时文件
OutputStreamIndexOutput out = new OutputStreamIndexOutput (Files.newOutputStream(tmpStatePath), ...);
out.writeInt(format.index());

//从系统cache刷到磁盘中,保证持久化
IOUtils.fsync(tmpStatePath, false); // fsync the state file

//move为目标文件,move操作为系统原子操作
Files.move (tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE);

元数据的恢复

上述的三种元数据信息被持久化存储到集群的每个节点,当集群完全重启(full restart)时,由于分布式系统的复杂性,各个节点保存的元数据信息可能不同。此时需要选择正确的元数据作为权威元数据。gateway的recovery负责找到正确的元数据,应用到集群。

当集群完全重启,达到recovery条件时,进入元数据恢复流程,一般情况下,recovery 条件由以下三个配置控制。

  • gateway.expected_nodes 预期的节点数。加入集群的节点数(数据节点或具备 Master 资格的节点)达到这个数量即开始 gateway 恢复。默认为 0;
  • gateway.recover_after_time 如果没有达到预期节点数量,恢复程将等待配置的时间,再尝恢复。默认为 5min;
  • gateway.recover_after_nodes只要配置数量的节点(数据节点或具备 Master 资格的节点)加入群就可以开始恢复;

假设取值10、5min、8,则启动时节点达 10个则即进入recover可;如果一直没达到10个, 5min 超时后如果节点达到8个也进入 recovery。

还有一些更细致配置,原理与上面三个配置类似:

  • gateway.expected_master_nodes:预期的具备 Master 资格的节数,加入集群的具备 Master 资格的节点数达到这个数量后立即开始 gateway 恢复。默认为0;
  • gateway.expected_data_nodes:预期的具备数据节点资格的节点数,加入集群的具备数据节点资格的节点数量达到这个数量后立即开始gatway的恢复。默认为0;
  • gateway.recover_after_master_nodes:指定数量具备 Master 资格的节点加入集群后就可以开始恢复;
  • gateway.recover_after_data_nodes:指定数量数据节点加入集群后就可以开始恢复;

当集群完全启动时,gateway模块负责集群层和索引层的元数据恢复,分片层的元数据恢复在allocation模块实现,但是由gateway模块在执行完上述两个层次恢复工作后触发。

当集群级、索引级元数据选举完毕后,执行submitStateUpdateTask提交一个source为local-gateway-elected-state的任务,触发获取shard级元数据的操作,这个Fetch过程是异步的,根据集群分片数量规模,Fetch过程可能比较长,然后submit任务就结束,gateway流程结束。

因此,三个层次的元数据恢复是由gateway模块和allocation 模块共同完成的,在Gateway将集群级、索引级元数据选举完毕后,在submitStateUpdateTask提交的任务中会执行allocation模块的reroute继续后面的流程。

元数据恢复流程分析

主要实现在GatewayService 类中,它继承自ClusterStateListener,在集群状态发生变化(clusterChanged)时触发,仅由Master节点执行。

  1. Master选举成功之后,判断其持有的集群状态中是否存在STATE_NOT_RECOVERED_BLOCK,如果不存在,则说明元数据已经恢复,跳过gateway恢复过程,否则等待。
  2. Master 从各个具备Master资格的节点主动获取元数据信息。
  3. 从获取的元数据信息中选择版本号最大的作为最新元数据,包括集群级、索引级。
  4. 两者确定之后,调用allocation模块的reroute,对未分配的分片执行分配,主分片分配过程中会异步获取各个shard级别元数据,默认超时为13s。

集群级和索引级元数据信息是根据存储在其中的版本号来选举的,而主分片位于哪个节点却是allocation模块动态计算出来的,先前主分片不一定还被选为新的主分片。关于主分片选举策略我们在allocation一章中介绍。

选举集群级和索引级别的元数据

判断是否满足进入recovery条件:实现位于GatewayService#clusterChanged,执行此流程的线程为clusterApplierService#updateTask

此处省略相关的代码引用。当满足条件时,进入recovery 主要流程:实现位于Gateway#performStateRecovery;执行此流程的线程为generic。

首先向有Master资格的节点发起请求,获取它们存储的元数据:

代码语言:javascript复制
//具有Master资格的节点列表
String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
//发送获取请求并等待结果
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();

等待回复时,必须收到所有节点的回复,无论回复成功还是失败(节点通信失败异常会被捕获,作为失败处理),此处没有超时。

在收齐的这些回复中,有效元信息的总数必须达到指定数量。异常情况下,例如,某个节点上元信息读取失败,则回复信息中元数据为空。

代码语言:javascript复制
int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());

minimumMasterNodesProvider的值由下面的配置项决定:discovery.zen.minimum_master_nodes

接下来就是通过版本号选取集群级和索引级元数据。

代码语言:javascript复制
选举集群级元数据: 
public void performStateRecovery(...) {
    //遍历请求的所有节点
    for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNode ()) {
        //根据元信息中记录的版本号选举元信息
        if (electedGlobalState == null) {
            electedGlobalState = nodeState . metaData() ;
        } else if (nodeState.metaData().version() > electedGlobalState.version()) {
            electedGlobalState = nodeState .metaData() ;
        }
    }
}
代码语言:java复制
选举索引级元数据:
public void performStateRecovery(. ..) {
    final Object[] keys = indices.keys;
    //遍历集群的全部索引
    for (int i = 0; i < keys. length; i  ) {
        if (keys[i] != null) {
            Index index = (Index) keys[i];
            IndexMetaData electedIndexMetaData = null;
            //遍历请求的全部节点,对特定索引选择版本号最高的作为该索引元数据
            for (TransportNodesListGatewayMetaState.NodeGa tewayMetaStatenodeState : nodesState.getNodes()) {
                IndexMetaData indexMetaData = nodeState.metaData ().index(index);
                if (electedIndexMetaData == null) {
                    electedIndexMetaData = indexMetaData;
                } else if (indexMetaData.getVersion() > electedIndexMetaData.getVersion()) {
                    electedIndexMetaData = indexMetaData;
                }
            }
        }
    }
}

触发allocation

当上述两个层次的元信息选举完毕,调用clusterService.submitStateUpdateTask 提交一个集群任务,该任务在masterService#updateTask线程池中执行,实现位于GatewayRecoveryListener#onSuccess

主要工作是构建集群状态(ClusterState),其中的内容路由表依赖allocation模块协助完成,调用allocationService.reroute进入下一阶段:异步执行分片层元数据的恢复,以及分片分配。updateTask线程结束。至此,gateway恢复流程结束,集群级和索引级元数据选举完毕,如果存在未分配的主分片,则分片级元数据选举和分片分配正在进行中。

思考

元数据信息是根据版本号选举出来的,而元数据写入成功的条件是“多数”,因此,保证进入recovery的条件为节点数量为“多数”,可以保证集群级和索引级的一致性。

获取各节点存储的元数据,然后根据版本号选举时,仅向具有Master资格的节点获取元数据。

0 人点赞