【腾讯云ES】Elasticsearch 分布式架构剖析及扩展性优化

2022-11-29 16:09:06 浏览数 (1)

1. 背景

        Elasticsearch 是一个实时的分布式搜索分析引擎,简称 ES。一个集群由多个节点组成,节点的角色可以根据用户的使用场景自由配置,集群可以以节点为单位自由扩缩容,数据以索引、分片的形式散列在各个节点上。本文介绍 ES 分布式架构基础原理,剖析分布式元数据管理模型,并介绍腾讯云 ES 在分布式扩展性层面相关的优化,解析代码基于 8.5 版本。

2. 分布式架构

        我们首先来看看 ES 的分布式架构。一个 ES 集群包含多个节点,节点的类型有多种,最新版本主要的角色类型包括:

  • master 节点:可设置专属 master 节点,也可以和其它节点角色共享。
  • 数据节点: data、data_hot、data_warm、data_cold、data_frozen 等。
  • 功能节点: ingest、ml、remote_cluster_client、transform 等。

        节点角色分为三类:主节点(master)用于管理集群;数据节点(data)用于存储管理数据,数据节点按照数据特点分为多种类型,冷温热等等;功能节点,包括 ETL、机器学习、远程管理等等。

下面我们拿一种典型的大规模集群场景架构来举例说明。

ES 分布式架构ES 分布式架构

        上图所示的分布式架构中,上面部分是专属 master 节点,负责管理集群的元数据,他们之间通过基于类 Raft 的分布式一致性协议进行选主、元数据同步。下面部分是专属 data 节点,data 节点上包含索引分片。索引对应传统关系数据库的表,是逻辑概念,一个索引包含多个分片,分片是节点上的数据存储单元,它按照主键 hash 或用户自定义数据路由(routing)均匀分布到各个节点。分片一般包含一主(primary)、多从(replica)分片,一个索引主从分片之间的数据复制模型基于微软提出的 PacificA 协议。

3. 读写模型

3.1 写入模型

        ES 的任意节点可作为写入请求的协调节点,接收用户请求,协调节点先将写入请求 hash 至分片粒度并先转发对应主分片写入,主分片写入成功再转发至从分片,主从分片均写入完毕经协调节点返回客户端成功。

        腾讯云 ES 内核团队实现了写入分组定向路由、主从分片物理复制能力,可以减少从分片的写入栈计算开销,写入性能提升50% 。

分布式写入模型分布式写入模型

        本文不详细解析写入过程,有兴趣同学可参考写入流程解析:https://cloud.tencent.com/developer/article/1370501

3.2 查询模型

        和写入一样,ES 的任意节点可以作为查询请求的协调节点,请求转发至对应一个或多个(取决于路由规则,不指定路由默认索引所有分片均查询)数据分片的主或者从分片进行查询,查询根据复杂度分不同类型,QUERY_THEN_FETCH(两阶段),QUERY_AND_FETCH(一阶段),各个分片查询结果最后在协调节点汇聚,返回最终结果给客户端。

分布式查询模型分布式查询模型

        本文不详细解析查询过程,有兴趣同学可参考查询流程解析:https://cloud.tencent.com/developer/article/1154813

4. 分布式架构元数据模型分类

        在大规模分布式存储架构中,元数据模型主要分为中心化架构对等架构两类。中心化架构的典型代表包括 HDFS、ES、BigTable 等,对等架构典型的系统包括 Cassandra、Dynamo 等。

4.1 中心化架构

        中心化架构的特点是专有的主节点管理集群元数据,HDFS 中 Namenode 统一管理元数据,data node 不单独维护,客户端找 Namenode 获取路由信息。中心化架构的优势是在大规模集群场景下,元数据同步范围收敛,效率更高。由于主节点提供路由查询信息,因此其主要缺点是主节点易成为瓶颈,一般通过 Federation 机制优化,或者将元数据存储到数据表中分布在部分节点上管理。ES 也采用了中心化架构,稍后我们展开介绍。

中心化架构中心化架构

4.2 对等架构

        对等架构的特点是去中心化,没有独立的主节点。节点之间通过 Gossip 协议传播同步元数据,所有节点保存全量的元数据。这种方式架构简单清晰,没有中心化瓶颈。缺点是通过 Gossip 协议收敛元数据效率偏低,受节点数量限制,扩展性弱。

对等架构对等架构

5. 元数据模型

        前面我们介绍了 ES 的分布式架构基础原理及读写基本流程,也了解了业内常用的分布式架构元数据管理模型。下面我们来看看 ES 是如何管理集群的,其核心元数据模型是如何运作的。

        ES 的元数据由 master 节点维护管理,同时其它节点也维护着全量的元数据,目的是为了确保每个节点都能承担数据路由的能力。元数据包括节点信息、索引信息、分片路由信息、配置信息等等,下面我们先揭开元数据在内存中的神秘面纱,然后再看看元数据是如何持久化的。

5.1 内存结构

        ES 元数据的内存数据结构对象是 ClusterState,代码层面对应的是 ClusterState.java。其主要的成员包含:

成员

类型

描述

version

long

元数据版本。由 master 控制,每次元数据变更加1。

clusterName

String

集群名称。

nodes

DiscoveryNodes

节点信息,包含不同角色的节点。

metaData

MetaData

集群的配置信息。包含 _cluster/settings 设定的动态配置、各个索引配置信息、模板配置信息等。

routingTable

RoutingTable

索引分片到节点的映射关系,是主要的数据路由结构。

routingNodes

RoutingNodes

节点到索引分片的映射关系,主要用于分片分配、均衡决策。各个节点自己维护,每次需要时基于 routingTable 全量构建而来,不会在节点之间发布。

blocks

ClusterBlocks

集群的 block 信息。例如集群整体只读、单个索引只读等信息。

customs

Map

其它自定义的元数据,例如 snapshot 自定义配置、xpack 自定义配置等。

        上面的内存结构中,对象占比最大的是 nodes、metaData、routingTable 和 routingNodes,这四块是元数据的核心组成部分:

元数据核心结构元数据核心结构

        接下来对这四个部分的内部结构进行展开分析。

5.1.1 DiscoveryNodes

DiscoveryNodes 对象描述的是集群的节点信息。里面包含如下主要的成员信息:

代码语言:txt复制
// 当前 master
private final String masterNodeId;
// 本地节点
private final String localNodeId;
// 完整的节点列表
private final ImmutableOpenMap<String, DiscoveryNode> nodes;

// 其它分类型节点列表
private final ImmutableOpenMap<String, DiscoveryNode> dataNodes;
private final ImmutableOpenMap<String, DiscoveryNode> masterNodes;
private final ImmutableOpenMap<String, DiscoveryNode> ingestNodes;

        从变量名称很容易理解,其包含了本地节点、当前 master 节点的 id 信息,以及集群全量节点列表(nodes),并将这个列表分类为不同类型的节点列表。

5.1.2 MetaData

MetaData 包含集群全局的配置信息。主要的成员包括:

代码语言:txt复制
// 集群的动态设定,transient 全量重启后就没有了,persistent 会持久化
private final Settings transientSettings;
private final Settings persistentSettings;

// 索引、模板信息
private final ImmutableOpenMap<String, IndexMetaData> indices;
private final ImmutableOpenMap<String, IndexTemplateMetaData> templates;

// 集群选举管理信息
private final CoordinationMetaData coordinationMetaData;

// raft 选举使用的 term
private final long term;

// 最近提交的选举节点列表,内部是一个 Set<String> nodeIds
private final VotingConfiguration lastCommittedConfiguration;

// 最近接收的选举节点列表
private final VotingConfiguration lastAcceptedConfiguration;

// 用户通过 _cluster/voting_config_exclusions 接口设定的选举排除节点列表
private final Set<VotingConfigExclusion> votingConfigExclusions;

5.1.3 RoutingTable

        RoutingTable 包含主要的数据路由信息,在查询、写入请求的时候提供分片到节点的路由,动态构建不会持久化。我们先来通过一张图了解这个数据路由结构的包含关系:

分片到节点的映射(路由表)分片到节点的映射(路由表)

        一个 RoutingTable 包含集群的所有索引,一个索引包含多个分片,一个分片包含一个主、多个从本分片,最底层的 ShardRouting 描述具体的某个副本分片所在的节点以及正在搬迁的目标节点信息。用户请求时只指定索引信息,请求到达协调节点,由协调节点根据该路由表来获取底层分片所在节点并转发请求。

        接下来看看对应的数据结构,RoutingTable 顶层主要的成员只有一个:

代码语言:txt复制
// key 为索引名,IndexRoutingTable 为该索引包含的分片信息
private final ImmutableOpenMap<String, IndexRoutingTable> indicesRouting;

        IndexRoutingTable 对象:

代码语言:txt复制
// 索引信息,包含索引名称、uuid 等。
private final Index index;

// 索引的分片列表,key 为分片的编号,例如 2 号分片,3 号分片
private final ImmutableOpenIntMap<IndexShardRoutingTable> shards;

        一套分片信息包含一个主、多个从分片,其对象结构 IndexShardRoutingTable:

代码语言:txt复制
// 当前分片信息,主要包含分片的编号、分片对应的索引信息
final ShardId shardId;

// 当前分片的主分片信息
final ShardRouting primary;

// 当前分片的多个从分片列表
final List<ShardRouting> replicas;

// 当前分片全量的分片列表,包含主、副本
final List<ShardRouting> shards;

// 当前分片已经 started 的活跃分片列表
final List<ShardRouting> activeShards;

// 当前分片已经分配了节点的分片列表
final List<ShardRouting> assignedShards;

分片最底层的数据结构是 ShardRouting ,它描述这个分片的状态、节点归属等信息:

代码语言:txt复制
private final ShardId shardId;

// 分片所在的当前节点 id
private final String currentNodeId;

// 如果分片正在搬迁,这里为目标节点 id
private final String relocatingNodeId;

// 是否是主分片
private final boolean primary;

// 分片的状态,UNASSIGNED/INITIALIZING/STARTED/RELOCATING
private final ShardRoutingState state;

// 每一个分片分配后都有一个唯一标识
private final AllocationId allocationId;

5.1.4 RoutingNodes

        该对象为节点到分片的映射关系。主要用于统计每个节点的分片分配情况,在分片分配、均衡的时候使用,需要时根据 RoutingTable 动态构建,不会持久化。

节点到分片的映射节点到分片的映射

RoutingNodes 的主要成员包括:

代码语言:txt复制
// 节点到分片的映射,key 为 nodeId,value 为一个包含分片列表的节点信息
private final Map<String, RoutingNode> nodesToShards = new HashMap<>();

// 未分配的分片列表,每次 reroute 从这里获取分片分配
private final UnassignedShards unassignedShards = new UnassignedShards(this);

// 已经分配的分片列表
private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();

// 当前分片远程恢复的统计数据 incoming/outcoming
private final Map<String, Recoveries> recoveriesPerNode = new HashMap<>();

上面 RoutingNode 的主要结构包括:

代码语言:txt复制
private final String nodeId;

// 该节点上的分片列表
private final LinkedHashMap<ShardId, ShardRouting> shards; 

// 该节点上的初始化中的分片列表
private final LinkedHashSet<ShardRouting> initializingShards;

// 该节点上的正在搬迁(搬走)的分片列表
private final LinkedHashSet<ShardRouting> relocatingShards;

        前面就是所有元数据在内存中的结构,最后我们借助这张 UML 图来看元数据的整体结构和关系:

元数据类结构元数据类结构

5.2 元数据持久化

        前面介绍的元数据内存结构中,只有部分信息会被持久化存储到磁盘上,其它结构都是在节点启动后动态构建或者直接在内存中动态维护的。持久化的内容包含两部分,索引元数据集群元数据也叫全局元数据,下面我们分别来介绍这两部分持久化的内容。

5.2.1 索引元数据(index metadata)

索引元数据维护的是各个索引独有的配置信息,持久化的内容主要包括:

  • in_sync_allocations:每个分片分配之后都有一个唯一的 allocationId,该列表是主分片用来维护被认为跟自己保持数据一致的副本列表。从这个列表中踢出的分片不会接收新的写入,需要走分片恢复流程将数据追齐之后才能再被放入该队列。
  • mappings:索引的 mapping 信息,定义各个字段的类型、属性。
  • settings:索引自己的配置信息。
  • state:索引状态,OPEN/CLOSED。
  • aliases:索引别名信息。
  • routing_num_shards:索引分片数量。
  • primary_terms:每一轮分片切主都会产生新的 primary term,用于保持主从分片之间的一致性。

5.2.2 全局元数据(global metadata)

        全局元数据持久化的内容主要是前面描述的 MetaData 对象中去除 indices 索引部分。包括动态配置、模板信息、选举信息等。主要包含三部分:

  • manifest-数字.st:该文件是一个磁盘元数据管理入口,主要管理元数据的版本信息。
  • global-数字.st:MetaData 中的全局元数据的主要信息就是持久化到这个文件,包括动态配置、模板信息等。
  • node-数字.st:当前节点的元数据信息,包括节点的 nodeId 和版本信息。

5.2.3 元数据文件分布

        在 7.6.0 版本之前,元数据直接存放在每个节点磁盘上,且在专有 master 节点上每个索引一个元数据目录。在 7.6.0 版本之后,ES 将元数据放到节点本地独立的 lucene 索引中保存。7.6.0 之前持久化的数据目录包含的内容:

代码语言:txt复制
├── indices
│   ├── 00IUbRWZSzGsN71k0TlPmA
│   │   └── _state
│   │   └── state-8.st
│   ├── 01RTGqCbRe-9PYO55j8zAQ
│   │   └── _state
│   │   └── state-10.st
│   ├── 02DSrGNzRzizuI42Yf6aJg
│   │   └── _state
│   │   └── state-17.st

├── node.lock
└── _state
├── global-4928.st
└── node-0.st

        上图中上半部分是每个索引的元数据,它们位于 indices 目录下,一个索引对应一个 uuid 目录,下面有一个 _state 文件夹,索引自己的元数据就放在这个 _state 下面。下半部分就是我们上面描述的全局元数据持久化的文件。

        7.6.0 之后 data/nodes/0 里面保存的是 segment 文件,元数据以本地 Lucene index 方式持久化,收敛文件数量。其中 node-0.st 是旧版升级后的兼容文件:

代码语言:txt复制
├── node.lock
└── _state
├── _3q.cfe
├── _3q.cfs
├── _3q.si
├── _3r.cfe
├── _3r.cfs
├── _3r.si
├── node-0.st
├── segments_6i
└── write.lock

元数据索引同时保存全局、索引元数据,用 type 字段区分,索引包含三个字段:

  • type:cluster 级别元数据 “global”;索引级别元数据 “index”。
  • index_uuid:索引元数据对应索引的 UUID。
  • data:metadata 元数据内容。

同时,在每次提交保存的时候,会存一份 commit user data,包括选举的 term,最新的版本、node id、node version 等信息:

代码语言:txt复制
*  ------------------------------ ----------------------------- ---------------------------------------------- 
* | "type" (string field)        | "index_uuid" (string field) | "data" (stored binary field in SMILE format) |
*  ------------------------------ ----------------------------- ---------------------------------------------- 
* | GLOBAL_TYPE_NAME == "global" | (omitted)                   | Global metadata                              |
* | INDEX_TYPE_NAME  == "index"  | Index UUID                  | Index metadata                               |
*  ------------------------------ ----------------------------- ---------------------------------------------- 
*
* Additionally each commit has the following user data:
*
*  --------------------------- ------------------------- ------------------------------------------------------------------------------- 
* |        Key symbol         |       Key literal       |                                     Value                                     |
*  --------------------------- ------------------------- ------------------------------------------------------------------------------- 
* | CURRENT_TERM_KEY          | "current_term"          | Node's "current" term (≥ last-accepted term and the terms of all sent joins)  |
* | LAST_ACCEPTED_VERSION_KEY | "last_accepted_version" | The cluster state version corresponding with the persisted metadata           |
* | NODE_ID_KEY               | "node_id"               | The (persistent) ID of the node that wrote this metadata                      |
* | NODE_VERSION_KEY          | "node_version"          | The (ID of the) version of the node that wrote this metadata                  |
*  --------------------------- ------------------------- ------------------------------------------------------------------------------- 

6. 元数据管理

        ES 的索引创建、删除、mapping 更新、template 更新、节点加入脱离等 DDL 操作都会涉及到元数据变更。前面我们从内存、持久化层面介绍了 ES 元数据的组成部分,接下来我们看看 ES 是如何对元数据进行管理的。

6.1 元数据初始化

        ES 节点的启动入口在 Node.java 的 start 函数中,里面会初始化各种 service。元数据管理相关的 service 也是在这个环节进行初始化。GatewayMetaState 对象负责元数据的存取,在节点启动过程中会根据节点的类型,确定元数据的存取方式。分为以下几种场景:

  • master 节点。基于 LucenePersistedState 对象同步落盘,每当节点收到新的元数据的时候会马上保存到磁盘。并更新缓存,读取的时候优先读取缓存对象,节点启动的时候缓存会从磁盘加载。
  • data 节点。包括前述 data 前缀的 role,基于 AsyncPersistedState 对象异步落盘,对应异步线程名称是 AsyncLucenePersistedState#updateTask。
  • 其它功能节点。非 master 属性且不保存数据的节点,例如 ingest、ml、transform 等。基于 InMemoryPersistedState 对象直接保存到内存不做持久化。无论是在内存中的还是持久化的元数据对象,它们都对外暴露一个 PersistedState 接口。提供保存最后接收的元数据( setLastAcceptedState),以及获取最后接收的元数据(getLastAcceptedState)。

GatewayMetaState.java 代码片段:

代码语言:txt复制
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.canContainData(settings)) {
    ......
    if (DiscoveryNode.isMasterNode(settings)) {
        persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
    } else {
        persistedState = new AsyncPersistedState(
            settings,
            transportService.getThreadPool(),
            new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState)
        );
    }        
    ......
    this.persistedState.set(persistedState);
} else {
    ......
    persistedState.set(new InMemoryPersistedState(currentTerm, clusterState));
}

6.2 发布流程

        接下来我们以最典型的 DDL 创建索引操作为例,介绍整个元数据发布流程。ES 的 DDL 操作都是通过元数据变更推导模式实现的,例如创建索引,首先 master 会产生一版带新增索引的元数据,并将该新版元数据发布至各个节点,各个节点和自己上一个持久化的元数据版本进行比对,产生差异化的索引执行创建索引、分片的任务。

下图是一张宏观的索引创建元数据变更流程,方便大家从整体架构上有个初步的了解。

索引创建元数据变更流程索引创建元数据变更流程

        接下来分别从 master、data 节点维度分别看着两阶段的处理流程。下面是两个阶段的各个状态:

代码语言:txt复制
enum PublicationTargetState {
    NOT_STARTED,
    FAILED,
    SENT_PUBLISH_REQUEST, // 已发送 publish 请求
    WAITING_FOR_QUORUM,   // 等待大多数节点响应
    SENT_APPLY_COMMIT,    // 已发送 commit 请求
    APPLIED_COMMIT,       // 元数据应用完毕
}

6.2.1 Master publish 阶段

        用户发起 create index 请求,会先到达 rest 层,由 RestCreateIndexAction 解析请求,并产生 transport 层的请求转发给 master 处理。调用链:

RestCreateIndexAction -> TransportCreateIndexAction -> MetadataCreateIndexService

所有参数解析完毕之后,就进入提交集群元数据变更任务流程:

代码语言:txt复制
private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
    normalizeRequestSetting(request);
    //提交元数据变更任务,索引创建优先级为 URGENT
    submitUnbatchedTask(
        "create-index ["   request.index()   "], cause ["   request.cause()   "]",
        new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {

            @Override
            public ClusterState execute(ClusterState currentState) throws Exception {
                // 分配分片产生新版本的元数据
                return applyCreateIndexRequest(currentState, request, false);
            }

            @Override
            public void onFailure(Exception e) {
                ......
            }
        }
    );
}

        代码中 applyCreateIndexRequest 主要负责索引创建过程中,分配分片产生新版本的元数据,里面涉及复杂的分片分配、均衡策略流程,腾讯云 ES 内核结合单个索引分片数、节点主分片数、节点总分片数、节点存储空间多个维度深度定制优化了均衡策略,彻底解决社区版各个维度负载不均的问题,这里不展开,以后再单独介绍。

        上述任务经过 MasterService.submitStateUpdateTask 包装之后提交给线程池执行:

代码语言:txt复制
public void submitTask(BatchedTask task, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
    tasksPerBatchingKey.compute(task.batchingKey, (k, existingTasks) -> {
        if (existingTasks == null) {
            existingTasks = Collections.synchronizedSet(new LinkedHashSet<>());
        } else {
            assert assertNoDuplicateTasks(task, existingTasks);
        }
        // 按 executor 汇总 task
        existingTasks.add(task);
        return existingTasks;
    });

    if (timeout != null) {
        // 指定超时时间,不指定默认元数据变更 30s 超时
        threadExecutor.execute(task, timeout, () -> onTimeoutInternal(task, timeout));
    } else {
        threadExecutor.execute(task);
    }
}

        元数据任务支持 batch 方式执行,相同类别的任务例如 template 变更任务可以批量变更,参考 MetadataIndexTemplateService.TEMPLATE_TASK_EXECUTOR。创建索引任务不能批量执行。元数据任务执行采用的是单线程多任务按优先级串行执行的模式,线程名称为 masterService#updateTask,上述 threadExecutor 的初始化过程:

代码语言:txt复制
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
    return EsExecutors.newSinglePrioritizing(
        // masterService#updateTask,我们经常在 jstack 堆栈中看到的 master 节点元数据变更线程
        nodeName   "/"   MASTER_UPDATE_THREAD_NAME,
        daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME),
        threadPool.getThreadContext(),
        threadPool.scheduler(),
        ......
    );
}

元数据变更的优先级:

代码语言:txt复制
public enum Priority {
    IMMEDIATE((byte) 0), // 节点加入、脱离等
    URGENT((byte) 1), // 索引、template 创建等
    HIGH((byte) 2),
    NORMAL((byte) 3),
    LOW((byte) 4),
    LANGUID((byte) 5);
}

元数据变更任务产生了新的元数据之后,就会进入元数据 publish 阶段,调用栈:

MasterService.publishClusterStateUpdate() -> MasterService.publish() -> Coordinator.publish()

核心逻辑在 Coordinator 中处理,该类负责管理元数据的 publish、commit 流程。publish 函数的代码片段:

代码语言:txt复制
@Override
public void publish(
    ClusterStatePublicationEvent clusterStatePublicationEvent,
    ActionListener<Void> publishListener,
    AckListener ackListener
) {
    try {
        synchronized (mutex) {
           ......
            try {
                ......
                // 获取目标发布节点
                final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
                // leader checker 在数据节点上用于追踪 master,和 master 保持心跳
                leaderChecker.setCurrentNodes(publishNodes);
                // 设置当前 leader 需要追踪的 follower 节点,主要是用于心跳保持
                followersChecker.setCurrentNodes(publishNodes);
                // lag 探测器,如果某个节点超过指定时间默认 90s 没有 publish 成功则踢出
                lagDetector.setTrackedNodes(publishNodes);
                // 启动 publish 任务
                publication.start(followersChecker.getFaultyNodes());
            } finally {
                publicationContext.decRef();
            }
        }
    } catch (Exception e) {
       ......
    }
}

依次发送给各节点:

代码语言:txt复制
public void start(Set<DiscoveryNode> faultyNodes) {
    logger.trace("publishing {} to {}", publishRequest, publicationTargets);
    publicationTargets.forEach(PublicationTarget::sendPublishRequest);
}
void sendPublishRequest() {
    Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
}

        sendPublishRequest 内部会分全量和增量两种发送方式,在节点脱离之后重新加入或者有版本落后的情况下元数据会全量发送。至此 master 节点的发送流程就结束了,接下来就会在上面的 PublishResponseHandler 中等待多数节点响应后发起 commit 请求。

6.2.2 数据节点接收 publish

        实际处理 publish 的节点不止数据节点,还包括不带数据属性的所有节点,也包括当前 master 节点本身。只是我们为了方便描述,以数据节点处理 publish 逻辑为主。数据节点接收 publish 请求的入口在 PublicationTransportHandler.handleIncomingPublishRequest(),接收的 ClusterState 分为全量和增量两种,用一个 boolean 区分,代码片段:

代码语言:txt复制
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
    StreamInput in = request.bytes().streamInput();
    try {
        ......
        // 全量或增量标记
        if (in.readBoolean()) {
            final ClusterState incomingState;
            ......
            final PublishWithJoinResponse response = acceptState(incomingState);
            // 这里 lastSeenClusterState 的目的就是为了下次接收增量之后应用 diff
            lastSeenClusterState.set(incomingState);
            return response;
        } else {
            final ClusterState lastSeen = lastSeenClusterState.get();
            if (lastSeen == null) {
                throw new IncompatibleClusterStateVersionException("have no local cluster state");
            } else {
                // 基于 lastSeen 应用增量 diff 产生新的完整的 incomingState
                ClusterState incomingState;
                try {
                    final Diff<ClusterState> diff;
                    // Close stream early to release resources used by the de-compression as early as possible
                    try (StreamInput input = in) {
                        diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
                    }
                    incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
                } catch (Exception e) {
                    throw e;
                }
                ......
                final PublishWithJoinResponse response = acceptState(incomingState);
                // 本地变量 CAS 替换成全局
                lastSeenClusterState.compareAndSet(lastSeen, incomingState);
                return response;
            }
        }
    } finally {
        IOUtils.close(in);
    }
}

        序列化完毕后进入本地 accept 阶段。上述片段中 acceptState 函数内部先会做元数据的各种校验,比如 term、clusterUUID 等最终更新 lastAcceptedState 并持久化,在元数据未 commit 之前元数据的 metaData 部分的 clusterUUIDCommitted 属性为 false。元数据的标识是通过 clusterUUID 来区分的,判断一个集群的多个节点具备相同的元数据版本可根据此属性,并结合 clusterUUIDCommitted 属性确定元数据是已经 commit 的版本。处理接收请求代码片段:

代码语言:txt复制
public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
    final ClusterState clusterState = publishRequest.getAcceptedState();
    if (clusterState.term() != getCurrentTerm()) {
       ......
    }
    if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) {
       ......
    }
    // 更新 lastAcceptedState
    persistedState.setLastAcceptedState(clusterState);
    return new PublishResponse(clusterState.term(), clusterState.version());
}

6.2.3 Master 节点发起 commit

        在 master 节点发送 publish 请求给各个节点后,会在等待半数以上节点响应才会进入 commit 流程。代码片段:

代码语言:txt复制
public Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
    ......
    publishVotes.addVote(sourceNode);
    if (isPublishQuorum(publishVotes)) {
        return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion()));
    }
    return Optional.empty();
}

一旦达到多数节点响应后,master 发起 apply commit 请求:

代码语言:txt复制
@Override
protected void sendApplyCommit(
    DiscoveryNode destination,
    ApplyCommitRequest applyCommit,
    ActionListener<Empty> responseActionListener
) {
    transportService.sendRequest(
        destination,
        COMMIT_STATE_ACTION_NAME,
        applyCommit,
        COMMIT_STATE_REQUEST_OPTIONS,
        new ActionListenerResponseHandler<>(wrapWithMutex(responseActionListener), in -> Empty.INSTANCE, Names.CLUSTER_COORDINATION)
    );
}

ApplyCommitRequest 请求比较简单,仅包含 sourceNode、term、version 三个字段。

6.2.4 数据节点处理 commit

        数据节点处理 commit 请求的入口在 Coordinator.java。一方面标记接收的元数据为 commited 状态,另外进行元数据应用。代码片段:

代码语言:txt复制
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
    synchronized (mutex) {
        // 将我们之前接收到的 acceptedState 的元数据改为 commit 状态
        coordinationState.get().handleCommit(applyCommitRequest);
        ......
        if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
            // master node applies the committed state at the end of the publication process, not here.
            applyListener.onResponse(null);
        } else {
            // 元数据接收完毕,进入应用环节
            clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState, applyListener.map(r -> {
                onClusterStateApplied();
                return r;
            }));
        }
    }
}

        数据节点 commit 元数据之后,元数据的发布流程就完毕了,之后数据节点进入异步应用环节。异步单线程处理,线程名称是 clusterApplierService#updateTask 我们在很多 jstack 中看到该线程名称即为数据节点的元数据应用线程。

        元数据应用流程核心逻辑在 ClusterApplierService.java 的 applyChanges 函数中,主要是处理一些列元数据变更附属的任务,例如创建、删除索引、template 维护等,另外 master 节点上还会回调一些元数据变更完成后关联的 listener。

代码语言:txt复制
private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, Recorder stopWatch) {
    ......
    logger.debug("apply cluster state with version {}", newClusterState.version());
    // 进行元数据应用,会按优先级分为 low/normal/high 来应用,例如创建索引属于 high
    callClusterStateAppliers(clusterChangedEvent, stopWatch);

    logger.debug("set locally applied cluster state to version {}", newClusterState.version());
    // 更新本地最新 commited 的 clusterState
    state.set(newClusterState);

    // 这里一般是 master 节点发起元数据 commit 结束后,再回调相应的 listener
    callClusterStateListeners(clusterChangedEvent, stopWatch);
}

        我们主要以创建索引来介绍元数据变更主流程,上述 callClusterStateAppliers apply 元数据的环节会进到 IndicesClusterStateService.java applyClusterState 函数,该函数有一连串应用最新 state 的流程:

代码语言:txt复制
@Override
public synchronized void applyClusterState(final ClusterChangedEvent event) {
    // 最新 commit 的元数据
    final ClusterState state = event.state();
    ......
    // 删除索引、分片,清理磁盘分片目录
    deleteIndices(event); // also deletes shards of deleted indices

    // 删除索引、分片,只是清理内存对象,主要是针对 Close/Open 操作
    removeIndicesAndShards(event); // also removes shards of removed indices

    // 更新索引 settings、mapping 等
    updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache

    // 创建索引、分片
    createIndicesAndUpdateShards(state);
}

        createIndicesAndUpdateShards 主要处理索引创建任务,根据接收到的元数据,对比内存最新的索引列表,找出需要创建的索引列表进行创建,同时创建缺失的分片信息:

代码语言:txt复制
private void createIndicesAndUpdateShards(final ClusterState state) {
    DiscoveryNodes nodes = state.nodes();
    // 通过节点到分片的映射,获取属于该节点的索引分片信息
    RoutingNode localRoutingNode = state.getRoutingNodes().node(nodes.getLocalNodeId());
  
    // 对比接收到的元数据和本节点内存的索引列表,找出新增的需要创建的索引
    final Map<Index, List<ShardRouting>> indicesToCreate = new HashMap<>();
    for (ShardRouting shardRouting : localRoutingNode) {
        ShardId shardId = shardRouting.shardId();
        if (failedShardsCache.containsKey(shardId) == false) {
            final Index index = shardRouting.index();
            final var indexService = indicesService.indexService(index);
            if (indexService == null) {
                // 不存在的索引就是需要创建的
                indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting);
            } else {
                // 索引存在看看有没需要创建或更新的分片
                createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService);
            }
        }
    }

    // 过滤出来的待创建索引列表,遍历依次创建
    for (Map.Entry<Index, List<ShardRouting>> entry : indicesToCreate.entrySet()) {
        final Index index = entry.getKey();
        final IndexMetadata indexMetadata = state.metadata().index(index);
        logger.debug("[{}] creating index", index);

        try {
            // 创建索引
            indicesService.createIndex(indexMetadata, buildInIndexListener, true);
        } catch (Exception e) {
            ......
        }
        // 创建索引对应的本地分片
        for (ShardRouting shardRouting : entry.getValue()) {
            createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService);
        }
    }
}

        至此,一个完整的元数据变更流程就介绍完了。总体来看,master 经过两阶段提交元数据后,进入元数据应用流程,各个节点对比自己本地的信息和接收的元数据,根据差异处理相关流程。

7. 扩展性优化

        前面我们介绍了 ES 元数据管理模型,接下来我们结合元数据管理模型看看 ES 分布式架构存在的扩展性瓶颈及优化措施。

7.1 扩展性瓶颈

        社区版本建议控制整个集群分片数在 3 万以下,节点数不超过100,超过之后在创建、删除索引、维护 mapping、template 等元数据变更操作时可能出现较严重的卡顿。例如 ES 在写入触发创建的场景,大批量 bulk 请求在索引创建完毕之前会堆积内存,节点有被打垮的风险。其次,因为单集群支持的分片数、节点数有限,导致用户大规模数据场景下,需要建大量的小规格集群满足业务需求,从而导致集群碎片化资源严重,整体 TCO 偏高。从前面元数据整体的变更流程中我们总结出如下主要的瓶颈:

  • master 构建新元数据,例如节点分片分配、均衡策略时,会基于 RoutingTable 全量构建 RoutingNodes。
  • master 节点元数据序列化,全量比对新旧元数据,构建 diff 并序列化。
  • data 节点元数据 diff 推导,基于 RoutingTable 全量构建 RoutingNodes,多次全量遍历本节点分片和 diff 比对。
  • 单个任务多次元数据变更,例如创建一个索引,会先创建主分片再创建从分片,导致单任务多次元数据变更
  • 路由全节点分发,单次元数据变更全节点分发,GC、网络抖动等长尾节点影响变更。
  • 部分场景元数据同步落盘,HDD 场景性能影响严重。

       总结来说,分片到节点的映射(RoutingTable)和节点到分片的映射(RoutingNodes)两者之间的全量构建,以及新旧元数据全量对比、全节点分发、同步落盘等是影响扩展性的主要瓶颈。

7.2 优化措施

        结合前面分析的瓶颈点,优化措施主要包括映射增量维护、收敛元数据变更范围、重启性能优化等方向。

7.2.1 映射增量维护

        RoutingTable 和 RoutingNodes 全量构建的问题,在分片总数达到 5 万以上时,多处这种相互构建的性能瓶颈明显,我们可以采用两者增量维护的方式,避免全量相互构建:

映射增量维护映射增量维护

        元数据发送前序列化阶段也不需要全量对比,因为我们上面已经维护了一个 diff 的增量,可以省去一些全量对比的环节,直接将产生好的 diff 发送给数据节点即可。数据节点上的元数据推导也不需要再进行全量对比,直接拿增量的 diff 进行应用。对于部分持久化存在同步刷盘的情况改为异步刷盘,缓解 HDD 盘场景的性能瓶颈。整体的优化方向是流程增量、异步化:

元数据变更增量、异步化改造元数据变更增量、异步化改造

7.2.2 收敛变更范围

        对于元数据发布全节点的瓶颈,在节点数多了例如数百个之后,容易因个别节点的抖动受影响。我们可以控制元数据发布的范围,只发生在 master 节点之间,索引、分片的创建采用定向节点下发的方式避免请求扇出严重,而数据节点可以通过学习的方式动态构建元数据。

收敛元数据变更范围收敛元数据变更范围

7.2.3 重启性能优化

        在集群的总分片数和节点数达到一定规模后,重启恢复的性能会成为瓶颈,时间会比较长,也是我们需要主要优化的方向。因为重启会涉及到大量的分片分配、恢复,主要的优化思路包括:

  • 排序粗化:分片恢复有优先级,按优先级排序时,我们可以从分片维度优化到索引维度。上面就是整体的扩展性优化的主要方向,还有其它方面的优化,例如统计接口层面的优化,主要思路是构建缓存、低频的方式提升接口性能。
  • 批量 fetch:在恢复之前,master 会先获取所有数据节点上分片最新的状态,可以从单分片请求优化为按节点批量获取。
  • 遍历裁剪:在分片恢复过程中,每分配一批(并发恢复分片数控制)分片,都需要全量遍历所有分片,遍历过程中可以根据并发控制的分片数对遍历进行高效裁剪,去掉不需要分配的无用分片的遍历过程。

7.2.4 优化效果

        在经过一系列全方位扩展性瓶颈优化之后,我们将集群的分片数扩展至百万级,节点数扩展至数百上千级。于此同时,我们也将部分热点瓶颈优化反馈给了社区。附部分已合并 PR:

  • https://github.com/elastic/elasticsearch/pull/87723
  • https://github.com/elastic/elasticsearch/pull/64753
  • https://github.com/elastic/elasticsearch/pull/46520
  • https://github.com/elastic/elasticsearch/pull/65045
  • https://github.com/elastic/elasticsearch/pull/56870
  • https://github.com/elastic/elasticsearch/pull/65172
  • https://github.com/elastic/elasticsearch/pull/60564

8 总结

        本文通过介绍 ES 分布式架构,对比业界主流的分布式元数据管理模式,之后剖析了 ES 核心元数据管理模型。最后结合集群扩展性的瓶颈介绍了腾讯在 ES 扩展性方面所做的相关优化,现阶段重点解决了元数据的扩展性,节点的扩展性后续还需持续优化以支持更大规模,目前腾讯云 ES 内核已能满足绝大部分用户的扩展性需求。

参考文献

  • https://www.elastic.co/guide/en/elasticsearch/guide/index.html
  • https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
  • https://cassandra.apache.org/_/cassandra-basics.html
  • https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/bigtable-osdi06.pdf

0 人点赞