关于Zookeeper框架Curator中的主从选举框架LeaderLatch的坑

2021-04-12 15:23:00 浏览数 (2)

关于Zookeeper框架Curator中的主从选举框架LeaderLatch的坑

根据网上给的LeaderLatch的示例代码写的业务代码,这里面用到的serverId是从配置中读取的(每个新部署的实例自动生成,之后一直不变)

代码语言:javascript复制
@PostConstruct
public void setUp() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    client = CuratorFrameworkFactory.builder()
            .connectString(zkConnectString)
            .retryPolicy(retryPolicy)
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(3000)
            .namespace(Constants.GLOBAL_NAME_SPACE)
            .build();
    client.start();

    leaderLatch = new LeaderLatch(client, QUORUM_PATH, serverId);
    leaderLatch.addListener(new LeaderLatchListener() {
        @Override
        public void isLeader() {
            log.info("Currently run as leader");
        }

        @Override
        public void notLeader() {
            log.info("Currently run as slave");
        }
    });
    leaderLatch.start();
}

这段代码达到的预期效果应该是当前实例在运行为leader的时候,日志打印Currently run as leader;当丢失leader的时候,日志打印Currently run as leader。 多实例运行时,刚开始选主是没问题的,只有一个为leader。但是丢失主的实例不能切换Slave方式运行;我这里测试丢失主的方式有两种,一是断开实例与zk之间的连接,二是删除zk上面的该实例锁住的数据(例如leaderlatch路径为/test,那么每个实例会在/test这个路径下生成一个临时节点,将这个临时节点手动删除)。 而且这么写代码,很难做主从切换缓冲时间来防止主从来回切换。

所以,将代码中的listener去掉,修改为主动轮询监听:

代码语言:javascript复制
@PostConstruct
public void setUp() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    client = CuratorFrameworkFactory.builder()
            .connectString(zkConnectString)
            .retryPolicy(retryPolicy)
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(3000)
            .namespace(Constants.GLOBAL_NAME_SPACE)
            .build();
    client.start();

    leaderLatch = new LeaderLatch(client, QUORUM_PATH, serverId);
    leaderLatch.start();
}
@Scheduled(fixedDelay = 3000)
public void checkLeader() throws Exception {
    //首先利用serverId检查自己是否还存在于leaderlatch选举结果集中
    //考虑网络阻塞,zk数据异常丢失等情况
    boolean isExist = false;
    Collection participants = leaderLatch.getParticipants();
    for (Participant participant : participants) {
        if (serverId.equals(participant.getId())) {
            isExist = true;
            break;
        }
    }
    //如果不存在,则重新加入选举
    if (!isExist) {
        log.info("Current server does not exist on zk, reset leaderlatch");
        leaderLatch.close();
        leaderLatch = new LeaderLatch(client, QUORUM_PATH, serverId);
        leaderLatch.start();
        log.info("Successfully reset leaderlatch");
    }

    //查看当前leader是否是自己
    //注意,不能用leaderLatch.hasLeadership()因为有zk数据丢失的不确定性
    //利用serverId对比确认是否主为自己
    Participant leader = leaderLatch.getLeader();
    boolean hashLeaderShip = serverId.equals(leader.getId());

    if (log.isInfoEnabled()) {
        log.info("Current Participant: {}", JSON.toJSONString(participants));
        log.info("Current Leader: {}", leader);
    }

    //主从切换缓冲
    if(hashLeaderShip) {
        isLeaderCount  ;
        isSlaveCount = 0;
    } else {
        isLeaderCount = 0;
        isSlaveCount   ;
    }

    if (isLeaderCount > 3 && !isLeader) {
        log.info("Currently run as leader");
    }

    if (isSlaveCount > 3 && isLeader) {
        log.info("Currently run as slave");
    }
}

0 人点赞