前面介绍了 Zookeeper 集群 ZAB 协议、配置中心、注册中心、数据与存储、会话与事务管理、分布式锁等相关的知识点,今天我将详细的为大家介绍 zookeeper 开源客户端相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!
Zookeeper 开源客户端 ZkClient
ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原生API接口之上进行了包装,是⼀个更易用的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。
接下来,还是从创建会话、创建节点、读取数据、更新数据、删除节点等方面来介绍如何使用zkClient 这个zookeeper客户端。
添加依赖
在pom.xml⽂件中添加如下内容。
代码语言:javascript复制<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
创建会话
使⽤ZkClient可以轻松的创建会话,连接到服务端。
代码语言:javascript复制package com.lagou.zkClient;
import org.I0Itec.zkclient.ZkClient;
public class CreateSession {
/*
借助zkClient完成会话创建
*/
public static void main(String[] args) {
/**
* 创建一个zkClient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
*/
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("会话被创建了...");
}
}
创建节点
ZkClient提供了递归创建节点的接口,即其帮助开发者先完成父节点的创建,再创建子节点。
代码语言:javascript复制package com.lagou.zkClient;
import org.I0Itec.zkclient.ZkClient;
public class CreateNote {
/*
借助zkClient完成会话创建
*/
public static void main(String[] args) {
/**
* 创建一个zkClient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
*/
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("会话被创建了...");
// 创建节点
/**
* createParents:是否要创建父节点,如果值为true,则就会递归创建节点
*/
zkClient.createPersistent("/lg-zkClient/lg-c1", true);
System.out.println("节点递归创建完成");
}
}
值得注意的是,在原生态接口中是无法创建成功的(⽗节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点。
删除节点
ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点。
代码语言:javascript复制package com.lagou.zkClient;
import org.I0Itec.zkclient.ZkClient;
public class DeleteNote {
/*
借助zkClient完成会话创建
*/
public static void main(String[] args) {
/**
* 创建一个zkClient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
*/
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("会话被创建了...");
// 递归删除子节点
String path = "/lg-zkClient/lg-c1";
zkClient.createPersistent(path "/c11");
zkClient.deleteRecursive(path);
System.out.println("递归删除成功");
}
}
结果表明ZkClient可直接删除带子节点的⽗节点,因为其底层先删除其所有子节点,然后再删除父节点。
获取子节点
代码语言:javascript复制package com.lagou.zkClient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class Get_NoteChildren {
/*
借助zkClient完成会话创建
*/
public static void main(String[] args) throws InterruptedException {
/**
* 创建一个zkClient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
*/
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("会话被创建了...");
// 获取子节点列表
List<String> children = zkClient.getChildren("/lg-zkClient");
System.out.println(children);
// 注册监听对象
/*
客户端可以对一个不存在的节点进行子节点变更的监听
只要该节点的子节点的列表发生变化,或者该节点本身被创建或则删除,都会触发监听
*/
zkClient.subscribeChildChanges("/lg-zkClient-get", new IZkChildListener() {
/**
* @param list : 变化后的子节点列表
*/
@Override
public void handleChildChange(String parentPath, List<String> list) throws Exception {
System.out.println(parentPath "的子节点列表发生了变化,变化后的子节点列表为" list);
}
});
// 测试
zkClient.createPersistent("/lg-zkClient-get");
Thread.sleep(1000);
zkClient.createPersistent("/lg-zkClient-get/c1");
Thread.sleep(1000);
}
}
结果表明:客户端可以对⼀个不存在的节点进行子节点变更的监听。⼀旦客户端对⼀个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端,该节点本身的创建或删除也会通知到客户端。
获取数据(节点是否存在、更新、删除)
代码语言:javascript复制package com.lagou.zkClient;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class Note_API {
/*
借助zkClient完成会话创建
*/
public static void main(String[] args) throws InterruptedException {
/**
* 创建一个zkClient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了
*/
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("会话被创建了...");
// 创建节点
/**
* createParents:是否要创建父节点,如果值为true,则就会递归创建节点
*/
zkClient.createPersistent("/lg-zkClient/lg-c1", true);
System.out.println("节点递归创建完成");
// 判断节点是否存在
String path = "/lg-zkClient-Ep";
boolean exists = zkClient.exists(path);
if (!exists) {
// 创建临时节点
zkClient.createEphemeral(path, "123");
}
// 读取节点内容
Object o = zkClient.readData(path);
System.out.println(o);
// 注册监听
zkClient.subscribeDataChanges(path, new IZkDataListener() {
/*
当节点数据内容发生变化时,执行的回调方法
s : path
o : 变化后的节点内容
*/
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println(s ":该节点内容被更新,更新后的内容:" o);
}
/*
当节点被删除时,会执行的回调方法
s : path
*/
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println(s ":该节点被删除");
}
});
// 更新节点内容
zkClient.writeData(path, "456");
Thread.sleep(2000);
// 删除节点
zkClient.delete(path);
Thread.sleep(2000);
}
}
Zookeeper 开源客户端 Curator
urator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多Zookeeper客户端非常底层的细节开发⼯作,包括连接重连,反复注册Watcher和NodeExistsException异常等,是最流行的Zookeeper客户端之⼀。从编码风格上来讲,它提供了基于Fluent的编程风格支持
原生Zookeeper的不足
- 连接对象异步创建,需要开发人员自行编码等待。
- 连接没有会话超时自动重连机制。
- Watcher一次注册只生效一次。
- 不支持递归创建树形节点。
curator特点
- 设有Session超时重连机制。
- Watcher重复注册机制。
- 简化开发API。
- 遵循fluent风格API。
- 提供Zookeeper常用的场景封装实现。
添加依赖
在pom.xml文件中添加如下内容:
代码语言:javascript复制<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
创建会话
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很⼤。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。具体如下:
使用CuratorFramework这个工厂类的两个静态方法来创建⼀个客户端
代码语言:javascript复制public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
其中参数RetryPolicy提供重试策略的接口,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现, 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)。
通过调用CuratorFramework中的start()方法来启动会话
代码语言:javascript复制RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy); client.start();
代码语言:javascript复制RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy);
client.start();
其实进⼀步查看源代码可以得知,其实这两种方法内部实现⼀样,只是对外包装成不同的方法。它们的底层都是通过第三个⽅法builder来实现的。
代码语言:javascript复制RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.build();
client.start();
参数说明
代码语言:javascript复制connectString #zk的server地址,多个server之间使⽤英⽂逗号分隔开。
connectionTimeoutMs #连接超时时间, 如上是30s, 默认是 15s。
sessionTimeoutMs #会话超时时间,如上是50s,默认是 60s 。
retryPolicy #失败重试策略。
ExponentialBackoffRetry #构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)。
baseSleepTimeMs #初始的sleep时间,用于计算之后的每次重试的sleep时间 。
计算公式:当前sleep 时间 =baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount 1)))。
maxRetries #最大重试次数。
maxSleepMs #最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。
其他,查看org.apache.curator.RetryPolicy接⼝的实现类。
start() #完成会话的创建。
代码语言:javascript复制package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSession {
// 创建会话
public static void main(String[] args) {
// 不使用fluent编程风格
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("8.142.8.105:2181", retryPolicy);
curatorFramework.start();
System.out.println("会话被建立了");
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.142.8.105:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2被创建了");
}
}
需要注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。
创建节点
curator提供了⼀系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
下面简单介绍⼀下常用的几个节点创建场景。
创建⼀个初始内容为空的节点
代码语言:javascript复制client.create().forPath(path);
Curator默认创建的是持久节点,内容为空。
创建⼀个包含内容的节点
代码语言:javascript复制client.create().forPath(path,"我是内容".getBytes());
Curator和ZkClient不同的是依旧采用Zookeeper原生API的⻛格,内容使用byte[]作为方法参数。
递归创建父节点,并选择节点类型
代码语言:javascript复制client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
creatingParentsIfNeeded这个接口非常有用,在使用ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中⼀个可能的原因就是试图对⼀个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断⼀下该父节点是否存在——这个处理通常比较麻烦。在使用Curator 之后,通过调用creatingParentsIfNeeded 接口,Curator 就能够自动地递归创建所有需要的⽗节点。
下面通过一个实际例子来演示如何在代码中使用这些API。
代码语言:javascript复制package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CreateNote_curator {
// 创建会话
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.142.8.105:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2被创建了");
// 创建节点
String path = "/lg-curator/c1";
String s = client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
System.out.println("节点递归创建成功,该节点路径:" s);
}
}
删除节点
删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用新增不同的方法调用即可。
删除一个子节点
代码语言:javascript复制client.delete().forPath(path);
删除节点并递归删除其子节点
代码语言:javascript复制client.delete().deletingChildrenIfNeeded().forPath(path);
指定版本进行删除
代码语言:javascript复制client.delete().withVersion(1).forPath(path);
如果此版本已经不存在,则删除异常,异常信息如下。
代码语言:javascript复制org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
强制保证删除一个节点
代码语言:javascript复制 client.delete().guaranteed().forPath(path);
只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到⼀些网络异常的情况,此guaranteed的强制删除就会很有效果。
演示实例:
代码语言:javascript复制package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class DeleteNote_curator {
// 创建会话
public static void main(String[] args) throws Exception {
// 不使用fluent编程风格
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.142.8.105:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2被创建了");
// 删除节点
String path = "/lg-curator";
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
System.out.println("删除成功,删除的节点:" path);
}
}
获取数据
获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器端返回的最新的节点状态信息。
代码语言:javascript复制// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
演示:
代码语言:javascript复制package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class GetNote_curator {
// 创建会话
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.142.8.105:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2被创建了");
// 创建节点
String path = "/lg-curator/c1";
String s = client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
System.out.println("节点递归创建成功,该节点路径:" s);
// 获取节点的数据内容以及状态信息
// 数据内容
byte[] bytes = client.getData().forPath(path);
System.out.println("获取到的节点数据内容:" new String(bytes));
// 状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取节点的状态信息:" stat);
}
}
更新数据
更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如 果version已经变更,则抛出异常。
代码语言:javascript复制// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);
版本不一致异常信息:
代码语言:javascript复制org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
案例演示:
代码语言:javascript复制package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class UpdateNote_curator {
// 创建会话
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.142.8.105:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2被创建了");
String path = "/lg-curator/c1";
// 获取节点的数据内容以及状态信息
// 数据内容
byte[] bytes = client.getData().forPath(path);
System.out.println("获取到的节点数据内容:" new String(bytes));
// 状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取节点的状态信息:" stat);
// 更新节点内容
int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion();
System.out.println("当前的最新版本是" version);
byte[] byte2 = client.getData().forPath(path);
System.out.println("修改后的节点数据内容:" new String(byte2));
}
}
参考文章:https://blog.csdn.net/weixin_52851967/article/ details/126269572 https://blog.csdn.net/weixin_52851967/ article/details/126263824
推荐阅读 点击标题可跳转
打脸了!微软竟然发布了自己的 Linux
IPv4 开始收费!或将是一场新的 IT 灾难。。。
第一大服务器厂商:收入骤降 100 亿
发现一款吊炸天的远程控制与监控工具,有点牛逼
一个比 ping 更强大、更牛逼的命令行工具!
外资IT连连败退!Citrix和Radware或将撤离中国
新来个技术总监:谁再用 rebase 提交合并开除