前言
在上一篇文章中ZooKeeper入门一给大家介绍了分布式协调中间件ZooKeeper的下载安装以及集群的搭建,那么本篇文章我们就来继续介绍一下ZooKeeper的一些需要补充的重要概念、客户端的常用命令以及业界操作ZooKeeper的高度封装的客户端CuratorFramework,并使用它实现一个自定义的分布式配置中心。下面进入正文。
Stat与Watcher
Stat
ZooKeeper命名空间中的每个znode都有一个与之关联的stat结构,类似于Unix/Linux文件系统中文件的stat结构,znode的stat结构的字段和各自的含义如下:
- cZxid: 创建znode的事务ID
- mZxid: 最后修改znode的事务ID
- pZxid: 最后修改添加或删除子节点的事务ID
- ctime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode创建时间
- mtime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode最后修改时间
- dataVersion: 表示对该znode数据所做的更改次数
- cVersion: 表示对改znode的子节点进行的更改次数
- aclVersion: 表示对znode的ACL(访问权限列表)进行更改的次数
- emphemeralOwner: 如果znode是临时节点,则表示znode所有者的session ID; 如果改节点不是临时节点,则该值为0
- dataLength: znode数据的长度
- numChildren: 表示znode子节点的数量
Zxid
类似于RDBMS中的事务ID,用于标识一次更新操作的Proposal ID(提议ID)。为了保证顺序性,该zxid必须单调递增。因此ZooKeeper使用一个64位的数来标识,高32位是leader的epoch(Leader对应的年号), 从1开始,每次选出新的Leader, epoch加1, 低32位为该epoch的序号, 每次epoch变化都将低32位的序号重置,这样就保证了Zxid的全局递增性。
Watch
一个zk的节点可以被监控,包括这个目录中存储的数据的修改,一旦变化可以通知设置监控的客户端,这个特性是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理、集群管理和分布式锁等等。
watch机制的官方说明:一个Watch事件是一个一次性的触发器,当被设置了watch的数据发生改变的时候,则服务器将这个改变发送给设置了watch的客户端,以便通知他们。
可以注册watcher的方法:getData、exists、getChildren
可以触发watcher的方法:create、delete、setData。连接断开的情况下出发的watcher会丢失。
一个Watcher实例是一个回调函数,被回调一次就被移除了,如果还需要关注数据的变化,需要再次注册watcher
New一个ZooKeeper时注册的watcher叫default watcher, 它不是一次性的,只对client的连接状态变化作出反应
exists和setData设置数据监听, 而getChildren设置子节点监听
ZkCli常用命令
为了更好的使用zookeeper发挥分布式协调器的作用,就不得不学习它的命令行接口
进入zookeeper的安装目录,执行如下命令中的任意一条命令连接一个客户端
代码语言:javascript复制# 使用本地默认2181端口连接
./bin/zkCli.sh
# 连接远程zk服务,连接超时时间设置为3s
./bin/zkCli.sh -timeout 3000 -server <remoteIP>:2181
# 使用-waitforconnection选项连接远程zk服务
./bin/zkCli.sh -waitforconnection -timeout 3000 -server <remoteIP>:2181
# 使用自定义客户端配置属性文件连接远程zk服务
bin/zkCli.sh -client-configuration /path/to/client.properties
在连接的zk客户端控制台中执行 help 命令,就可以看到zk客户端控制台打印出了一系列的zk的命令行用法示例
代码语言:javascript复制ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b|-N|-B] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b|-N|-B val path
stat [-w] path
sync path
version
创建节点(znode)
用给定的路径创建一个节点,flag参数用于指定创建的节点是临时的还是顺序的。默认情况下,所有的节点都是持久的。当会话过期或客户端断开连接,临时节点(flag: -e)将被自动删除
顺序节点保证节点路径将是唯一的,ZooKeeper集合将向节点路径填充10位序列号。例如节点路径 /myApp 将转换位 /myApp/0000000001 , 下一个序号将是 /myApp/0000000002 。如果没有指定flag,则节点将被认为是持久的。
语法:
create /path data
示例:
create /firstNode first
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 24] create /firstNode first
Created /firstNode
要创建顺序节点,请添加flag: -s
语法:
create -s /path data
示例:
create -s /firstNode second
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 26] create -s /firstNode second
Created /firstNode0000000007
要创建临时节点,请添加flag: -e
语法:
create -e /path data
示例:
create -e /firstNode-ephemral ephemeral
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 27] create -e /firstNode-ephemral ephemeral
Created /firstNode-ephemral
记住客户端断开连接时,临时节点将被删除,你可以通过退出zkCli, 然后重新连接zkCli来查看效果
获取数据
通过get命令获取指定路径节点下的数据
语法:
get /path
示例:
get /firstNode
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 28] get /firstNode
first
要访问顺序节点,必须输入znode的完整的路径
示例:
get /firstNode0000000007
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 30] get /firstNode0000000007
second
设置数据
通过set命令设置指定znode的数据, 完成设置数据后,你可以通过get命令来检查数据
语法:
set /path data
示例:
set /firstNode first_update
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 31] set /firstNode first_update
通过执行 get /firstNode
命令查看效果
[zk: localhost:2181(CONNECTED) 32] get /firstNode
first_update
创建子节点
创建子节点类似于创建新的znode,唯一的区别是子节点的znode具有父路径
语法:
create /paren/path/subNode data
示例:
create /firstNode/child1 child1Data
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 1] create /firstNode/child1 child1Data
Created /firstNode/child1
[zk: localhost:2181(CONNECTED) 2] create /firstNode/child2 child2Data
Created /firstNode/child2
列出子节点:
此命令用于列出和显示znode的子项:
用法:
ls /path
示例:
ls /firstNode
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 3] ls /firstNode
[child1, child2]
检查状态
状态描述指定znode的元数据,它包含时间戳、版本号、数据长度和子znode等细项
语法:
stat /path
示例:
stat /firstNode
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 4] stat /firstNode
cZxid = 0x10000003c
ctime = Mon Jul 18 01:01:18 CST 2022
mZxid = 0x10000003f
mtime = Mon Jul 18 01:22:45 CST 2022
pZxid = 0x100000045
cversion = 2
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 2
删除znode
删除没有子节点的znode使用 delete /path
命令;
删除指定znode节点并递归删除其子节点使用deleteall /path
命令
用法:
代码语言:javascript复制delete /firstNode/child1
deleteall /firstNode
输出:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 5] delete /firstNode/child1
[zk: localhost:2181(CONNECTED) 6] deleteall /firstNode
[zk: localhost:2181(CONNECTED) 7] ls /firstNode
Node does not exist: /firstNode
ACL
ZK作为分布式应用架构中的重要中间件,通常会在上面以节点的方式存储一些关键信息。默认情况下,所有应用都可以读写任何节点,在复杂应用中这不太安全 ,ZK通过ACL机制来解决访问权限问题
- ZooKeeper的权限控制是基于每个znode节点的,需要对每个znode设置权限
- 每个znode支持设置多种权限控制方案和多个权限
- 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点
ACL权限控制使用 schema: id:permission 来标识, 注意涵盖3个方面
- 权限模式(Schema): 鉴权的策略
- 授权对象(ID)
- 权限:Permission
Schema
- world:只有一个用户anyone, 代表所有人
- ip: 使用ip地址认证
- auth: 使用已添加认证的用户认证
- digest: 使用"用户名: 密码"方式认证
id
授权对象是指权限赋予的用户或者实体,例如ip地址或者机器。授权模式schema与授权对象id之间的关系:
- world: 只有一个Id, 即anyone
- ip: 通常是一个ip地址或者一个ip地址段,比如192.168.0.110或192.168.0.1/24
- auth: 用户名
- digest: 自定义,通常是“username: BASE64(SHA-1(username:password))"
权限
- CREATE: 简写为c, 可以创建子节点
- DELETE: 简写成d, 可以删除子节点(仅下一级), 注意不是本节点
- READ:简写成r, 可以读取节点数据并显示子节点列表
- WRITE: 简写成w, 可以设置节点数据
- ADMIN: 简写成a, 可以设置节点访问控制列表
查看ACL
用法:
getAcl /path
示例:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 12] getAcl /firstNode
'world,'anyone
: cdrwa
可见一个znode节点默认是wold模式,任意用户都具备节点的所有操作权限
设置ACL
设置节点对所有用户都有写和管理权限
用法:
setAcl /path world:anyone wa
示例:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 17] setAcl /persistent_node world:anyone:wa
[zk: localhost:2181(CONNECTED) 18] get /persistent_node
Insufficient permission : /persistent_node
当我们对persistent_node节点设置的权限列表没有READ权限时,执行get /persistent_node
命令就会提示权限不足
digest授权模式
先添加用户:
addauth digest zhangsan:123456
再设置权限,这个节点只有zhangsan 这个用户对这个节点拥有所有权限
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 22] addauth digest zhangsan:123456
[zk: localhost:2181(CONNECTED) 23] setAcl /persistent_node auth:zhangsan:123456:rdwca
通过执行getAcl /persistent_node
指令可以获取到该节点的ACL
[zk: localhost:2181(CONNECTED) 24] getAcl /persistent_node
'diest,'zhangsan:jA/7JI9gsuLp0ZQn5J5dcnDQkHA=
: cdrwa
'digest,'zhangsan[zk:QalD ltUgPWxu8Pd gTLjlZ6aoY=
: cdrwa
Curator客户端
Curator客户端简介
Curator 框架是一个大幅度简化使用ZooKeeper的高水平ZooKeeper客户端API框架,它在ZooKeeper的基础上增加了很多新的特性,同时能够处理复杂的管理ZooKeeper集群连接和重试操作
Curator框架由以下几个子模块组成,它们的Maven坐标都已发布到Maven中央仓库,这些jar包的groupId、artifactId已经功能描述如下:
Curator的一些特性如下:
- 自动连接管理:
- 在使用ZooKeeper的时候存在一些潜在的错误场景需要ZooKeeper客户端重建连接或者重试操作,而Curator能够自动并透明地处理了这些错误场景
- 在必要的时候监视
NodeDataChanged
事件并调用updateServerList()
方法 - 通过Curator recipes自动移除监视器
- 干净的API
- 简化原生ZooKeeper的方法和事件使用API
- 提供更简便的用法和流畅的接口
- 丰富的方法实现
- Leader选举
- 共享锁
- 路径缓存和监视器
- 分布式队列
- 分布式优先队列
创建一个CuratorFramework实例
CuratorFramework`类是curator框架中与ZooKeeper通信的客户端类
CuratorFramework
实例可以通过提供了两个重载的静态newClient
方法和一个静态builder
方法创建,三个方法如下:
/**
* 提供两个构造参数的创建CuratorFramework实例方法
* @param connectString 连接zookeeper服务的连接串
* @param retryPolicy 重试策略
*/
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}
/**
* 提供四个构造参数的创建CuratorFramework实例方法
* @param connectString 连接zookeeper服务的连接串
* @param sessionTimeoutMs会话超时时间(单位:ms)
* @param connectionTimeoutMs 连接超时时间 (单位:ms)
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
}
/**
* 构建器方法,该方法返回一个CuratorFramework实例的构建器
* 通过静态内部类Builder的build方法创建CuratorFramework实例
*/
public static CuratorFrameworkFactory.Builder builder() {
return new CuratorFrameworkFactory.Builder();
}
public static class Builder {
public CuratorFramework build() {
return new CuratorFrameworkImpl(this);
}
}
以上CuratorFrameworkFactory#newClient
工厂方法提供了一种简化创建CuratorFramework
实例的方式,通过Builder
可以修改CuratorFramework
实例的所有参数。一旦你有了一个CuratorFramework
实例,你必须调用其start()
方法用来启动客户端实例,同时在应用结束的时候调用CuratorFramework
类的close方法。
CuratorFramework API
CuratorFramework
类使用流畅风格的接口,建议操作的时候使用返回一个CuratorFramework
实例的构建者Builder
, 用法如下:
// 创建持久节点
client.create().forPath("/head", new byte[0]);
// 删除节点
client.delete().inBackground().forPath("/head");
// 创建临时有序节点
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
// 设置监听器
client.getData().watched().inBackground().forPath("/test");
CuratorFramework类的常用方法
以上方法描述中的方法解释:
- withMode(CreateMode mode): 指定创建的节点类型
- withTtl(long ttl): 指定临时节点失效时间
- withVersion(int version): 指定版本号
- inBackground: 异步处理错误,有6个重载方法,分别是:
- inBackground(): 无参数方法,返回错误路径和字节数据对象
ErrorListenerPathAndBytesable<String>
- inBackground(BackgroundCallback callback, Object context): 入参带回调函数和上下文的异步处理错误,返回对象也是错误路径和字节数据
- inBackground(Object context): 入参只有上下文的异步处理错误
- inBackground(BackgroundCallback callback, Executor executor): 入参带回调函数和线程池的方法
- inBackground(BackgroundCallback callback, Object context, Executor executor): 入参带回调函数、上下文和线程池的方法
- inBackground(BackgroundCallback callback): 入参只带一个回调函数的方法
- inBackground(): 无参数方法,返回错误路径和字节数据对象
- watch() : 添加监视器;
- usingWatcher: 添加监视器有两个重载方法
- usingWatcher(Watcher watcher): 指定Watcher类型监视器;
- usingWatcher(CuratorWatcher watcher): 指定CuratorWatcher类型监视器
- storingStatIn(Stat stat): 设置节点状态结构
- withAcl(ListaclList): 设置访问权限列表
- forPath: 对指定路径进行操作,有两个重载方法
- forPath(String path): 操作指定路径,返回节点的Stat信息
- forPath(String path, byte[] date), 对指定路径的节点设置数据, 返回节点的Stat信息
- joining(Listserver): 设置zookeeper服务集群中的节点
- leaving(Listserver): 移除zookeeper服务集群中的节点
- withNewMembers(Listservers): 添加新的zookeeper服务节点
- forEnsemble(): 组织成配置信息,返回byte[]类型数据,解码成字符串类型数据才是zookeeper服务集群的配置信息
Notifications
后台操作和监视的通知(notification)通过CuratorListener
接口发布。您可以使用CuratorFramework
实例的getCuratorListenable()
的返回结果List<CuratorListener>
实例的add(CuratorListener listener)
方法向CuratorFramework
实例注册侦听器。监听器需要实现了下面这个抽象方法:
void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
CuratorEvent
CuratorEvent对象是一个超设置的POJO,它可以保存各种类型的后台通知和触发的监视器。CuratorEvent的有用字段依赖于通过getType()方法获得的事件类型
CuratorEvent
的事件类型和对应的触发方法如下:
命名空间
因为ZooKeeper集群是一个共享环境,所以观察名称空间约定非常重要,以便使用给定集群的各种应用程序不会使用冲突的ZK路径。
CuratorFramework
有一个“名称空间”的概念。在创建CuratorFramework
实例时设置名称空间(通过Builder)。CuratorFramework
将在调用它的一个api时将名称空间前置到所有路径,如下所示:
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
...
client.create().forPath("/test", data);
// znode 实际会写到 "/MyApp/test" 路径下
实现分布式配置中心
我们利用zookeeper的高度客户端封装框架CuratorFramwork及ZooKeeper的监听器机制实现一个分布式配置中心
引入Jar包依赖
代码语言:javascript复制 <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
curator的最新版本为5.3.0, 但是在5.3.0版本中TreeCache
类已经过时,这里我们使用的4.0.0版本连接3.7版本的ZooKeeper已经足够了
配置文件中添加zookeeper集群服务连接串
application.properties
zookeeper.connect.url=119.29.117.19:2181,119.29.117.19:2182,119.29.117.19:2183
项目中新建ZooKeeperConfig类
代码语言:javascript复制 package org.sang.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.RetryOneTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;
@Component
public class ZooKeeperConfig {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConfig.class);
@Value("${zookeeper.connect.url}")
private String zooKeeperConnectUrl;
private Properties configProperties = new Properties();
public String getProperty(String key){
return configProperties.getProperty(key);
}
// 项目启动后自动拉取ZooKeeper中路径为"/config"的节点下的子节点配置项
@PostConstruct
public void init() throws Exception {
// 连接zookeeper
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zooKeeperConnectUrl, new RetryOneTime(1000));
zkClient.start();
List<String> configNames = zkClient.getChildren().forPath("/config");
for(String key: configNames){
// 获取每个路径下的值(即配置值)
byte[] value = zkClient.getData().forPath("/config/" key);
configProperties.put(key, new String(value, "UTF-8"));
}
//保证实时性,利用zk的watch机制
TreeCache treeCache = new TreeCache(zkClient, "/config");
treeCache.start();
// 创建监听器
treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
switch (treeCacheEvent.getType()){
case NODE_UPDATED:
// 获取变更节点的路径名
String configName = treeCacheEvent.getData().getPath().replace("/config/", "");
// 监听到zk的zNode发生了数据变更
logger.info(configName "的值发生了更新, 更新后的值为:" new String(treeCacheEvent.getData().getData(), "UTF-8"));
// 获取变更的值
String configValue = new String(treeCacheEvent.getData().getData(), "UTF-8");
configProperties.put(configName, configValue);
break;
default:
break;
}
});
}
}
这里使用TreeCache#getListenable
方法返回对象添加监听器,是因为CuratorFramework#getListenable
方法返回对象添加监听器要更简单,无需显示地调用watches
或usingWatcher
方法。事件类型也只需要判断是否为节点变更类型即可。
效果测试
我们首先启动zookeeper服务集群,然后执行命令行zkCli.sh -server localhost:2181
连接ZooKeeper
服务
先在客户端执行创建znode节点的命令:
代码语言:javascript复制 [zk: localhost:2181(CONNECTED) 1] create /config/key1 valu1
Created /config/key1
[zk: localhost:2181(CONNECTED) 2] create /config/key2 valu2
Created /config/key2
[zk: localhost:2181(CONNECTED) 3] create /config/key3 value3
Created /config/key3
然后可以在spring-boot项目的控制台看到三行与之对应的配置变量发生变化的日志:
代码语言:javascript复制 2022-07-21 16:30:54.450 INFO 8856 --- [tor-TreeCache-0] org.sang.config.ZooKeeperConfig : key1的值发生了更新, 更新后的值为:eurekaService
2022-07-21 16:32:59.019 INFO 8856 --- [tor-TreeCache-0] org.sang.config.ZooKeeperConfig : key2的值发生了更新, 更新后的值为:consulService
2022-07-21 16:34:38.300 INFO 8856 --- [tor-TreeCache-0] org.sang.config.ZooKeeperConfig : key3的值发生了更新, 更新后的值为:nacosService
这样我们需要的配置变量就可以通过调用ZooKeeperConfig#getProperty(String key)
方法获取,前端展示的配置变量值可以由后端通过websocket推送给前端同步更新
小结
本文重点介绍了ZooKeeper的znode stat数据、监视器、常用命令以及Zookeeper的客户端高度封装框架CuratorFramwork
及其常用API,并演示了使用CuratorFramwork
API 实现一个分布式注册中心,利用ZooKeeper的监视器机制实现客户端更新配置变量时同步修改配置中心配置键对应的值。下篇文章笔者将带领大家继续学习使用CuratorFramwork框架实现一些增删改查和异步操作,乃至实现分布式事务锁功能。
原创不易,还请看到这里的小伙伴们点个赞和【在看】,笔者不胜感激!
参考阅读
【1】https://curator.apache.org/curator-framework/index.html
【2】 https://blog.csdn.net/chenzihao36/article/details
/106750057
---END---