ElasticSearch 7.x之前选主流程

2021-08-18 18:08:51 浏览数 (1)

简介

节点是一个ES的实例,其本质就是一个java进程。一个机器上可以运行多个ES进程,但是生产环境一般建议一台机器上就运行一个ES实例。每一个节点在启动之后,会分配一个UID,保存在data目录下。

节点名称

解释

配置

Master Node

主节点:处理创建、删除索引|等请求;决定分片被分配到哪个Data Node;维护并且更新Cluster State

node.master: true

Master Eligible Node

Eligible [ˈelɪdʒəbl] 合格的,合适的。具备成为Master资格的节点

ndoe.master: true

Data Node

数据节点:节点启动后,默认就是数据节点。

node.data: false

Coordinating Node

协调节点:所有节点默认都是Coordinating Node。路由请求到正确的节点,例如创建索引的请求,需要路由到Master节点。

Ingest Node

预处理节点:从5.0引入的概念。通过定义一系列的processors处理器和pipeline管道,对数据进行某种转换、富化。

node.ingest: true

ES集群中的每个节点都会存储Cluster State,知道索引内各分片所在的节点位置,因此在整个集群中的任意节点都可以知道一条数据该往哪个节点分片上存储。反之也知道该去哪个分片读。所以,Elasticsearch 不需要将读写请求发送到Master节点,任何节点都可以作为数据读写的切入点对请求进行响应。这样进一步减轻了Master节点的网络压力,同时提高了集群的整体路由性能。

代码语言:javascript复制
参数minimum_master_nodes防止脑裂、防止数据丢失的极其重要的参数:
discovery.zen.minimum_master_nodes = (master_eligible_ nodes)/2 1

这个参数会用于至少以下多个重要时机的判断:

  • 触发选主:进入选举临时的Master之前,参选的节点数需要达到法定人数;
  • 投票确认-Master: 选出临时的Master之后,得票数需要达到法定人数,才确认选主成功;
  • gateway选举元信息:向有Master资格的节点发起请求,获取元数据,获取的响应数量必须达到法定人数,也就是参与元信息选举的节点数;
  • Master发布集群状态:成功向节点发布集群状态信息的数量要达到法定人数;
  • NodesFaultDetection事件中是否触发rejoin:当发现有节点连不上时,会执行removeNode。 接着审视此时的法定人数是否达标(discovery.zen.minimum_ master_ nodes),不达标就主动放弃Master 身份执行rejoin以避免脑裂;
  • Master扩容场景:目前有3个master_eligible_ nodes,可以配置法定数量为2。如果将master_eligible_nodes 扩容到4个,那么法定数量就要提高到3。此时需要先把discovery.zen.minimum_master_nodes 配置设置为3,再扩容Master节点。这个配置可以动态设置:
代码语言:javascript复制
PUT /_ cluster/settings
{
    persistent": {
         "discovery.zen.minimum_master_nodes": 3
    }
}
  • Master减容场景:缩容与扩容是完全相反的流程,需要先缩减Master节点,再把法定数降低;

注意:最新版本ES7已经移除minimum_master_nodes配置,让Elasticsearch自己选择可以形成仲裁的节点。

Bully算法

bully  [ˈbʊli] Leader选举的基本算法之一。 它假定所有节点都有一个惟一的ID,该ID对节点进行排序。 任何时候的当前Leader都是参与集群的最高id节点。 该算法的优点是易于实现,但是当拥有最大 id 的节点处于不稳定状态的场景下会有问题,例如 Master 负载过重而假死,集群拥有第二大id 的节点被选为新主,这时原来的 Master 恢复,再次被选为新主,然后又假死…

Elasticsearch 通过推迟选举直到当前的 Master 失效来解决上述问题,但是容易产生脑裂,再通过法定得票人数过半解决脑裂。

假死

master 可能因负载过重而处于不稳定的状态,可能无法响应某些节点的请求,但短时间内可以恢复正常,为了避免频繁的选举,ES 中使用了推迟选举的方法,直到 master 失效才进行选举。当节点收不到 master 的响应时会先请求其他节点,获取活跃 master 的列表,确定 master 挂掉后再发起选举。

脑裂

Elasticsearch 采用了设置 “法定得票人数过半” 解决,在选举过程中当节点得票达到 discovery.zen.minimum_master_nodes 的值时才能成为 master,这个值通常设定为(具有master资格节点数 / 2) 1

选主流程

触发选举条件:

  • 集群启动
  • Master失效:非Master节点运行的MasterFaultDetection检测到Master失效,执行rejoin操作,重新选主。即使一个节点认为Master失效也会进入选主流程

ZenDiscovery流程概述:

  1. 每个节点计算最小的已知节点ID, 并向该节点发送领导投票;
  2. 如果一个节点收到足够多的票数,并且该节点也为自己投票,那么它将扮演领导者的角色,开始发布集群状态;
  3. 所有节点都会参数选举,并参与投票,但是只有有资格成为master的节点的投票才有效;

ZenDiscovery流程概括:

  1. 选举临时Master
  2. 投票确认Master:如果本节点当选,等待确立Master;如果其他节点当选,等待加入Master
  3. 启动节点失效探测器

选举临时Master

  • "ping"所有节点,获取节点列表fullPingResponses, ping结果不包含本节点,把本节点单独添加到fullPingResponses中;
  • 构建两个列表:
    • activeMasters 列表:存储集群当前活跃Master列表。遍历第一步获取的所有节点,将每个节点所认为的当前Master节点加入activeMasters列表中(不包括本节点)。在遍历过程中,如果配置了discovery. zen. master_ election. ignore_ non_ master_pings: true (默认为 false),而节点又不具备Master资格,则跳过该节点。
    • masterCandidates 列表:存储master候选者列表。遍历第-步获取列表,去掉不具备Master资格的节点,添加到这个列表中。

这个过程是将集群当前已存在的Master加入activeMasters列表,正常情况下只有一个。如果集群已存在Master,则每个节点都记录了当前Master是哪个,考虑到异常情况下,可能各个节点看到的当前Master不同,这种场景可能会在节点之间网络延迟比较大的情况下出现,在启动进程之后,出现选主结果不一致的情况。在构建activeMasters列表过程中,如果节点不具备Master资格,则可以通过ignore_ non_ master_ pings选项忽略它认为的那个Master。(过滤掉没有)

  • 如果activeMasters 为空,则从masterCandidates中选举,结果可能选举成功,也可能选举失败。如果不为空,则从activeMasters中选择最合适的作为Master。
代码语言:javascript复制
//ZenDiscovery.findMaster 函数
private DiscoveryNode findMaster() {
    
    // 1. PING 所有节点,获取各节点保存的集群信息
    List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
    
    // 2. 由于上面是获取的其他节点的信息,这里需要将本节点加上
    final DiscoveryNode localNode = transportService.getLocalNode();    
    fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));    
    
    // 3. 若设置了 master_election_ignore_non_masters 则去掉没有 master 资格(node.master: false)的节点
    final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
    
    // 4. 将各节点认为的 master 加入 activeMasters 列表
    List<DiscoveryNode> activeMasters = new ArrayList<>();    
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        // 避免未经其他节点检查就将本节点选为 master
        if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {           
            activeMasters.add(pingResponse.master());        
        }    
    }  
      
    // 5. 将 PING 到的具有 master 资格的节点加入 masterCandidates 列表作为候选节点
    List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();    
    for (ZenPing.PingResponse pingResponse : pingResponses) {        
        if (pingResponse.node().isMasterNode()) { // 只选取具有称为Master资格的节点           
            masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));        
        }    
    }
    
    if (activeMasters.isEmpty()) {
        // 6. 没有活跃的 master
        if (electMaster.hasEnoughCandidates(masterCandidates)) { // 判断候选节点是否符合法定节点数
            // 7. 拥有足够的候选节点,则进行选举
            final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);                   
            return winner.getNode();        
        } else {
            // 8. 无法选举,无法得到 master,返回 null
            return null;        
        }    
    } else {
        assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";        
        // 9. 有活跃的 master,从 activeMasters 中选择
        return electMaster.tieBreakActiveMasters(activeMasters);    
    }
}

补充方法:
// 判断候选节点是否符合法定节点数
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
   if (candidates.isEmpty()) {
       return false;
   }
   if (minimumMasterNodes < 1) {
       return true;
   }
   assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: "   candidates;
   return candidates.size() >= minimumMasterNodes;
}

1.在没有活跃的 master 时使用,上面第 7 步执行 master 的选举,通过 MasterCandidate::compare 对候选节点进行比较
// ElectMasterService.MasterCandidate::compare
public static int compare(MasterCandidate c1, MasterCandidate c2) {
    // 优先选择集群状态版本最新的节点
    int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
    if (ret == 0) {
        ret = compareNodes(c1.getNode(), c2.getNode());
    }
    return ret;
}

// ElectMasterService::compareNodes
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
    // 集群状态版本相同时优先选择具有 master 资格的节点
    if (o1.isMasterNode() && !o2.isMasterNode()) {
        return -1;
    }
    if (!o1.isMasterNode() && o2.isMasterNode()) {
        return 1;
    }
    // 集群存在多个 master 节点, 选择 id 较小的当做 master
    return o1.getId().compareTo(o2.getId());
}

2.在有活跃的 master 时,上面第 9 步通过 ElectMasterService::tieBreakAcitveMasters 使用 ElectMasterService::compareNodes 的规则从 activeMasters 中选择

投票确认Master

选举出的临时Master有两种情况:该临时Master是本节点或非本节点

  • 如果临时Master是本节点:
    • 等待足够多的具备Master资格的节点加入本节点(投票达到法定人数),以完成选举;超时(默认为30秒, 可配置)后还没有满足数量的join请求, 则选举失败,需要进行新一轮选举;
    • 成功后发布新的ClusterState;
  • 如果其他节点被选为临时Master:
    • 不再接受其他节点的join请求;
    • 向临时Master发送join 请求(投票),并等待回复。超时时间默认为1分钟 (可配置) ,如果遇到异常,则默认重试3次(可配置);
    • 最终当选的Master会先发布集群状态,才确认其他节点的join请求,并且已经收到了集群状态。本步骤检查收到的集群状态中的Master 节点如果为空,或者当选的Master不是之前选择的节点,则重新选举;
代码语言:javascript复制
// ZenDiscovery.innerJoinCluster函数
private void innerJoinCluster() {
    DiscoveryNode masterNode = null;
    final Thread currentThread = Thread.currentThread();
    nodeJoinController.startElectionContext();
    while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
        masterNode = findMaster();
    }

    if (!joinThreadControl.joinThreadActive(currentThread)) {
        logger.trace("thread is no longer in currentJoinThread. Stopping.");
        return;
    }

    // 选出的临时 master 是本节点,则等待被选举为真正的 master
    if (transportService.getLocalNode().equals(masterNode)) {
        final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
        logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                new NodeJoinController.ElectionCallback() {
                    @Override
                    public void onElectedAsMaster(ClusterState state) {
                        // 成功被选举为 master
                        synchronized (stateMutex) {
                            joinThreadControl.markThreadAsDone(currentThread);
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        logger.trace("failed while waiting for nodes to join, rejoining", t);
                        // 等待超时,重新开始选举流程
                        synchronized (stateMutex) {
                            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        }
                    }
                }

        );
    } else {
        // process any incoming joins (they will fail because we are not the master)
        // 选出的临时 master 不是本节点,不再接收其他节点的 join 请求
        nodeJoinController.stopElectionContext(masterNode   " elected");

        // send join request
        // 向临时节点发送join请求(投票),被选举的临时master在确认成为master并发布新的集群状态后才会返回
        final boolean success = joinElectedMaster(masterNode);
        
        // 成功加入之前选择的临时 master 节点,则结束线程,否则重新选举
        synchronized (stateMutex) {
            if (success) {
                DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                if (currentMasterNode == null) {
                    // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                    // a valid master.
                    logger.debug("no master node is set, despite of join request completing. retrying pings.");
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                } else if (currentMasterNode.equals(masterNode) == false) {
                    // update cluster state
                    joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                }

                joinThreadControl.markThreadAsDone(currentThread);
            } else {
                // failed to join. Try again...
                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
            }
        }
    }
}


NodeJoinController::checkPendingJoinsAndElectIfNeeded在节点获得足够的得票时使节点成为 master,并发布新的集群状态

代码语言:javascript复制
private synchronized void checkPendingJoinsAndElectIfNeeded() {
    // 计算节点得票数
    final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
    if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
        ...
    } else {
        // 得票数足够,成为 master
        electionContext.closeAndBecomeMaster();
    }
}

public synchronized int getPendingMasterJoinsCount() {
    int pendingMasterJoins = 0;
    // 统计节点得票数,只计算拥有 master 资格节点的投票
    for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
        if (node.isMasterNode()) {
            pendingMasterJoins  ;
        }
    }
    return pendingMasterJoins;
}

节点失效探测

选主流程已执行完毕,Master身份已确认,非Master节点已加入集群。节点失效检测会监控节点是否离线,然后处理其中的异常。失效检测是选主流程之后不可或缺的步骤,不执行失效检测可能会产生脑裂(双主或多主)。

我们需要启动两种失效探测器:

  • 在Master节点,启动NodesFaultDetection, 简称NodesFD。定期探测加入集群的节点是否活跃。
  • 在非Master节点,启动MasterFaultDetection, 简称MasterFD。 定期探测Master节点是否活跃。

NodesFaultDetection和MasterFaultDetection都是通过定期(默认为1秒) 发送的ping请求探测节点是否正常的,当失败达到一定次数(默认为3次),或者收到来自底层连接模块的节点离线通知时,开始处理节点离开事件。

0 人点赞