进阶分布式系统架构系列(十四):Zookeeper 开源客户端工具

2023-09-09 09:59:05 浏览数 (1)

前面介绍了 Zookeeper 集群 ZAB 协议配置中心注册中心数据与存储会话与事务管理分布式锁等相关的知识点,今天我将详细的为大家介绍 zookeeper 开源客户端相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!

Zookeeper 开源客户端 ZkClient

ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原生API接口之上进行了包装,是⼀个更易用的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。

接下来,还是从创建会话、创建节点、读取数据、更新数据、删除节点等方面来介绍如何使用zkClient 这个zookeeper客户端。

添加依赖

在pom.xml⽂件中添加如下内容。

代码语言:javascript复制
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.2</version>
</dependency>
创建会话

使⽤ZkClient可以轻松的创建会话,连接到服务端。

代码语言:javascript复制
package com.lagou.zkClient;
 
import org.I0Itec.zkclient.ZkClient;
 
public class CreateSession {
    /*
        借助zkClient完成会话创建
     */
    public static void main(String[] args) {
        /**
         * 创建一个zkClient实例就可以完成连接,完成会话的创建
         * serverString : 服务器连接地址
         * 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
         */
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("会话被创建了...");
    }
}
创建节点

ZkClient提供了递归创建节点的接口,即其帮助开发者先完成父节点的创建,再创建子节点。

代码语言:javascript复制
package com.lagou.zkClient;
 
import org.I0Itec.zkclient.ZkClient;
 
public class CreateNote {
    /*
        借助zkClient完成会话创建
     */
    public static void main(String[] args) {
        /**
         * 创建一个zkClient实例就可以完成连接,完成会话的创建
         * serverString : 服务器连接地址
         * 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
         */
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("会话被创建了...");
 
        // 创建节点
        /**
         *  createParents:是否要创建父节点,如果值为true,则就会递归创建节点
         */
        zkClient.createPersistent("/lg-zkClient/lg-c1", true);
        System.out.println("节点递归创建完成");
    }
}

值得注意的是,在原生态接口中是无法创建成功的(⽗节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点。

删除节点

ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点。

代码语言:javascript复制
package com.lagou.zkClient;
 
import org.I0Itec.zkclient.ZkClient;
 
public class DeleteNote {
    /*
        借助zkClient完成会话创建
     */
    public static void main(String[] args) {
        /**
         * 创建一个zkClient实例就可以完成连接,完成会话的创建
         * serverString : 服务器连接地址
         * 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
         */
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("会话被创建了...");
 
        // 递归删除子节点
        String path = "/lg-zkClient/lg-c1";
        zkClient.createPersistent(path "/c11");
        zkClient.deleteRecursive(path);
        System.out.println("递归删除成功");
    }
}

结果表明ZkClient可直接删除带子节点的⽗节点,因为其底层先删除其所有子节点,然后再删除父节点。

获取子节点
代码语言:javascript复制
package com.lagou.zkClient;
 
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
 
import java.util.List;
 
public class Get_NoteChildren {
    /*
        借助zkClient完成会话创建
     */
    public static void main(String[] args) throws InterruptedException {
        /**
         * 创建一个zkClient实例就可以完成连接,完成会话的创建
         * serverString : 服务器连接地址
         * 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
         */
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("会话被创建了...");
 
        // 获取子节点列表
        List<String> children = zkClient.getChildren("/lg-zkClient");
        System.out.println(children);
 
        // 注册监听对象
        /*
            客户端可以对一个不存在的节点进行子节点变更的监听
            只要该节点的子节点的列表发生变化,或者该节点本身被创建或则删除,都会触发监听
         */
        zkClient.subscribeChildChanges("/lg-zkClient-get", new IZkChildListener() {
            /**
             * @param list : 变化后的子节点列表
             */
            @Override
            public void handleChildChange(String parentPath, List<String> list) throws Exception {
                System.out.println(parentPath   "的子节点列表发生了变化,变化后的子节点列表为"   list);
            }
        });
 
        // 测试
        zkClient.createPersistent("/lg-zkClient-get");
        Thread.sleep(1000);
 
        zkClient.createPersistent("/lg-zkClient-get/c1");
        Thread.sleep(1000);
    }
}

结果表明:客户端可以对⼀个不存在的节点进行子节点变更的监听。⼀旦客户端对⼀个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端,该节点本身的创建或删除也会通知到客户端。

获取数据(节点是否存在、更新、删除)
代码语言:javascript复制
package com.lagou.zkClient;
 
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
 
public class Note_API {
    /*
        借助zkClient完成会话创建
     */
    public static void main(String[] args) throws InterruptedException {
        /**
         * 创建一个zkClient实例就可以完成连接,完成会话的创建
         * serverString : 服务器连接地址
         * 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
         */
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("会话被创建了...");
 
        // 创建节点
        /**
         *  createParents:是否要创建父节点,如果值为true,则就会递归创建节点
         */
        zkClient.createPersistent("/lg-zkClient/lg-c1", true);
        System.out.println("节点递归创建完成");
 
        // 判断节点是否存在
        String path = "/lg-zkClient-Ep";
        boolean exists = zkClient.exists(path);
        if (!exists) {
            // 创建临时节点
            zkClient.createEphemeral(path, "123");
        }
 
        // 读取节点内容
        Object o = zkClient.readData(path);
        System.out.println(o);
 
        // 注册监听
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            /*
                当节点数据内容发生变化时,执行的回调方法
                s : path
                o : 变化后的节点内容
             */
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println(s   ":该节点内容被更新,更新后的内容:"   o);
            }
 
            /*
                当节点被删除时,会执行的回调方法
                s : path
             */
            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println(s   ":该节点被删除");
            }
        });
 
        // 更新节点内容
        zkClient.writeData(path, "456");
        Thread.sleep(2000);
 
        // 删除节点
        zkClient.delete(path);
        Thread.sleep(2000);
    }
}

Zookeeper 开源客户端 Curator

urator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多Zookeeper客户端非常底层的细节开发⼯作,包括连接重连,反复注册Watcher和NodeExistsException异常等,是最流行的Zookeeper客户端之⼀。从编码风格上来讲,它提供了基于Fluent的编程风格支持

原生Zookeeper的不足
  • 连接对象异步创建,需要开发人员自行编码等待。
  • 连接没有会话超时自动重连机制。
  • Watcher一次注册只生效一次。
  • 不支持递归创建树形节点。
curator特点
  • 设有Session超时重连机制。
  • Watcher重复注册机制。
  • 简化开发API。
  • 遵循fluent风格API。
  • 提供Zookeeper常用的场景封装实现。
添加依赖

在pom.xml文件中添加如下内容:

代码语言:javascript复制
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
创建会话

Curator的创建会话方式与原生的API和ZkClient的创建方式区别很⼤。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。具体如下:

使用CuratorFramework这个工厂类的两个静态方法来创建⼀个客户端
代码语言:javascript复制
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
 
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

其中参数RetryPolicy提供重试策略的接口,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现, 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)。

通过调用CuratorFramework中的start()方法来启动会话
代码语言:javascript复制
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy); client.start();
代码语言:javascript复制
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy);
client.start();

其实进⼀步查看源代码可以得知,其实这两种方法内部实现⼀样,只是对外包装成不同的方法。它们的底层都是通过第三个⽅法builder来实现的。

代码语言:javascript复制
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
            .connectString("server1:2181,server2:2181,server3:2181")
            .sessionTimeoutMs(50000)
            .connectionTimeoutMs(30000)
            .retryPolicy(retryPolicy)
            .build();
client.start();
参数说明
代码语言:javascript复制
connectString  #zk的server地址,多个server之间使⽤英⽂逗号分隔开。
connectionTimeoutMs  #连接超时时间, 如上是30s, 默认是 15s。
sessionTimeoutMs  #会话超时时间,如上是50s,默认是 60s 。
retryPolicy  #失败重试策略。
ExponentialBackoffRetry  #构造器含有三个参数 ExponentialBackoffRetry(int  baseSleepTimeMs, int maxRetries, int maxSleepMs)。    
baseSleepTimeMs  #初始的sleep时间,用于计算之后的每次重试的sleep时间 。 
      计算公式:当前sleep 时间 =baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount 1)))。
      maxRetries  #最大重试次数。
      maxSleepMs  #最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。
      其他,查看org.apache.curator.RetryPolicy接⼝的实现类。
start() #完成会话的创建。
代码语言:javascript复制
package com.lagou.curator;
 
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
 
public class CreateSession {
    // 创建会话
    public static void main(String[] args) {
        // 不使用fluent编程风格
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("8.142.8.105:2181", retryPolicy);
        curatorFramework.start();
        System.out.println("会话被建立了");
 
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
 
        client.start();
        System.out.println("会话2被创建了");
    }
}

需要注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。

创建节点

curator提供了⼀系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。

下面简单介绍⼀下常用的几个节点创建场景。

创建⼀个初始内容为空的节点
代码语言:javascript复制
client.create().forPath(path);

Curator默认创建的是持久节点,内容为空。

创建⼀个包含内容的节点
代码语言:javascript复制
client.create().forPath(path,"我是内容".getBytes());

Curator和ZkClient不同的是依旧采用Zookeeper原生API的⻛格,内容使用byte[]作为方法参数。

递归创建父节点,并选择节点类型
代码语言:javascript复制
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

creatingParentsIfNeeded这个接口非常有用,在使用ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中⼀个可能的原因就是试图对⼀个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断⼀下该父节点是否存在——这个处理通常比较麻烦。在使用Curator 之后,通过调用creatingParentsIfNeeded 接口,Curator 就能够自动地递归创建所有需要的⽗节点。

下面通过一个实际例子来演示如何在代码中使用这些API。

代码语言:javascript复制
package com.lagou.curator;
 
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
 
public class CreateNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
 
        client.start();
        System.out.println("会话2被创建了");
 
        // 创建节点
        String path = "/lg-curator/c1";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
        System.out.println("节点递归创建成功,该节点路径:"   s);
    }
}
删除节点

删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用新增不同的方法调用即可。

删除一个子节点
代码语言:javascript复制
client.delete().forPath(path);
删除节点并递归删除其子节点
代码语言:javascript复制
client.delete().deletingChildrenIfNeeded().forPath(path);
指定版本进行删除
代码语言:javascript复制
client.delete().withVersion(1).forPath(path);

如果此版本已经不存在,则删除异常,异常信息如下。

代码语言:javascript复制
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
强制保证删除一个节点
代码语言:javascript复制
 client.delete().guaranteed().forPath(path);

只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到⼀些网络异常的情况,此guaranteed的强制删除就会很有效果。

演示实例:

代码语言:javascript复制
package com.lagou.curator;
 
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
 
public class DeleteNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        // 不使用fluent编程风格
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
 
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
 
        client.start();
        System.out.println("会话2被创建了");
 
        // 删除节点
        String path = "/lg-curator";
        client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
 
        System.out.println("删除成功,删除的节点:"   path);
    }
}
获取数据

获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器端返回的最新的节点状态信息。

代码语言:javascript复制
// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat(); 
client.getData().storingStatIn(stat).forPath(path);

演示:

代码语言:javascript复制
package com.lagou.curator;
 
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
 
public class GetNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
 
        client.start();
        System.out.println("会话2被创建了");
 
        // 创建节点
        String path = "/lg-curator/c1";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
        System.out.println("节点递归创建成功,该节点路径:"   s);
 
        // 获取节点的数据内容以及状态信息
 
        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println("获取到的节点数据内容:"   new String(bytes));
 
        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
 
        System.out.println("获取节点的状态信息:"   stat);
    }
}
更新数据

更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如 果version已经变更,则抛出异常。

代码语言:javascript复制
// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);

版本不一致异常信息:

代码语言:javascript复制
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =  BadVersion for

案例演示:

代码语言:javascript复制
package com.lagou.curator;
 
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
 
public class UpdateNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
 
        client.start();
        System.out.println("会话2被创建了");
 
        String path = "/lg-curator/c1";
 
        // 获取节点的数据内容以及状态信息
 
        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println("获取到的节点数据内容:"   new String(bytes));
 
        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
 
        System.out.println("获取节点的状态信息:"   stat);
 
        // 更新节点内容
        int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion();
        System.out.println("当前的最新版本是" version);
        byte[] byte2 = client.getData().forPath(path);
        System.out.println("修改后的节点数据内容:"   new String(byte2));
    }
}

参考文章:https://blog.csdn.net/weixin_52851967/article/ details/126269572 https://blog.csdn.net/weixin_52851967/ article/details/126263824

推荐阅读 点击标题可跳转

打脸了!微软竟然发布了自己的 Linux

IPv4 开始收费!或将是一场新的 IT 灾难。。。

第一大服务器厂商:收入骤降 100 亿

发现一款吊炸天的远程控制与监控工具,有点牛逼

一个比 ping 更强大、更牛逼的命令行工具!

外资IT连连败退!Citrix和Radware或将撤离中国

新来个技术总监:谁再用 rebase 提交合并开除

0 人点赞