利用Zookeeper实现集群选举

2023-10-16 15:13:19 浏览数 (2)

 什么是Zookeeper

分布式开源协调系统,数据模型简单,可以实现同步,配置管理,分组管理,分命名空间管理等。

技术本质

一个原子消息传递系统,它使所有服务器保持同步

FLP(3个科学家名字命名) 理论角度,Zookeeper是 SF的 ,使用超时机制来保证Livessnes

扩展FLP:

基于消息的异步通信系统,即使只有一个进程失败,也没有一个确定性算法,保证存活节点达到一致

确定性算法: 给定一个输入,一定会产生相同的输出

基于消息的异步通信系统 : 没有统一时钟,不能时间同步,不能使用超时,不能探测失败,消息可以任意延迟,消息可以乱序

扩展FLP不可能三角

Safety 系统中非故障节点达成了一致和合法的共识

Liveness 系统中非故障节点在有效时间能够达成共识

Fault Tolerance 协议必须在节点故障的时候同样有效

CAP理论角度 zookeeper 是CP的:

当leader节点down,剩余节点则会发起选举,选举期间导致短暂不可用。保证了强一致性而无法保证高可用性

数据模型

Watches监控节点变化

Data Access,Znode数据原子读写

Ephemeral Nodes临时节点,生命周期,与创建节点的进程一样

Sequence Nodes 创建的节点有序

client开发步骤

设计 path

选择znode类型

设计znode存储的内容

设计Watch  client 关注什么事件,事件发生后如何处理

集群选举算法 - 最小节点获胜

算法说明

当 Leader 节点挂掉的时候,持有最小编号 znode 的集群节点成为新的 Leader

设计path

集群公用父节点,例如:/examples/leader

设计Znode类型

所有服务,需要在父节点下创建znode, 由于持有最小编号 znode是leader,选择用ephemeral_sequential 类型 znode

设计Znode存储的内容

一般服务部署在多台机子上,这里我们可以按需定制,一般会存储ip

设计Watch

节点启动或者重连后,在 parent 目录下创建自己的 ephemeral_sequntial znode

如果自己的 znode 编号是最小的,则成为Leader,否则 watch parent 目录;

当 parent 目录有节点删除的时候,首先判断其是否是 Leader 节点,然后再看其编号是否正好比自己小1,如果是则自己成为 Leader,如果不是继续 watch。

Curator Framework实现

Curator Framework简化zookeeper,原始api的工具

内部实现了: LeaderSelector 最小节点获胜

pom如下:

代码语言:javascript复制
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.1.0</version>
</dependency>

实例代码:

代码语言:javascript复制
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.*;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 从Curator Framework 的sample中fork
 */
public class LeaderSelectClient extends LeaderSelectorListenerAdapter
 implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectClient( 
               CuratorFramework client,
               String path,
               String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        // 重新选举时自动入队
        leaderSelector.autoRequeue();
    }
    public void start() throws IOException {
        // 调用这个方法开启后台选举
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }
    @Override
    public void takeLeadership(CuratorFramework client)
     throws Exception {
        // 如果,不想开启下次选举,这个方法应该一致 不返回
        final int waitSeconds = (int) (5 * Math.random())   1;
        long sessionId = client.getZookeeperClient()
                          .getZooKeeper().getSessionId();
        System.out.println(name   " is now the leader. Waiting "
              waitSeconds   " seconds...");
        System.out.println(name   " has been leader " 
           leaderCount.getAndIncrement() 
           " time(s) before.sessionId:"  sessionId);
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
            while(true);
        } catch (InterruptedException e) {
            System.err.println(name   " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name   " relinquishing leadership.n");
        }
    }
}

测试代码:

代码语言:javascript复制
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;

public class LeaderSelectorExample {
    private static final int CLIENT_QTY = 10;

    private static final String PATH = "/examples/leader";

    public static void main( String[] args ) throws Exception {

        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectClient> examples = Lists.newArrayList();

        try {
            for ( int i = 0; i < CLIENT_QTY;   i ) {
                CuratorFramework client = CuratorFrameworkFactory.newClient( 
                          "127.0.01:2181",
                           new ExponentialBackoffRetry( 1000, 3 ) 
                );
                LeaderSelectClient example = 
                           new LeaderSelectClient(
                               client,
                               PATH,
                               "Client #"   i );
                clients.add( client );
                examples.add( example );
                client.start();
                example.start();
            }

            System.out.println( "Press enter/return to quitn" );
            new BufferedReader( new InputStreamReader( System.in ) )
                .readLine();
        } finally {
            System.out.println( "Shutting down..." );
            examples.forEach( CloseableUtils::closeQuietly );
            clients.forEach( CloseableUtils::closeQuietly );
        }
    }
}

运行结果

sessionId: 72059991158751452

最小节点是:

_c_cdbf26bd-2b3f-4c6e-9386-a9b258c5f0c9-lock-0000000234,

代码语言:javascript复制
stat /examples/leader/_c_cdbf26bd-2b3f-4c6e-9386-a9b258c5f0c9-lock-0000000234

ephemeralOwner对应十进制为: 72059991158751452 与之前的sessionId一致

zonode 存储的是IP

get  /examples/leader/_c_cdbf26bd-2b3f-4c6e-9386-a9b258c5f0c9-lock-0000000234

查看到的是 ip

0 人点赞