ZooKeeper入门(二):ZooKeeper常用命令介绍及使用Curator客户端实现分布式配置中心

2022-09-21 07:18:11 浏览数 (1)

前言

在上一篇文章中ZooKeeper入门一给大家介绍了分布式协调中间件ZooKeeper的下载安装以及集群的搭建,那么本篇文章我们就来继续介绍一下ZooKeeper的一些需要补充的重要概念、客户端的常用命令以及业界操作ZooKeeper的高度封装的客户端CuratorFramework,并使用它实现一个自定义的分布式配置中心。下面进入正文。

Stat与Watcher

Stat

ZooKeeper命名空间中的每个znode都有一个与之关联的stat结构,类似于Unix/Linux文件系统中文件的stat结构,znode的stat结构的字段和各自的含义如下:

  • cZxid: 创建znode的事务ID
  • mZxid: 最后修改znode的事务ID
  • pZxid: 最后修改添加或删除子节点的事务ID
  • ctime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode创建时间
  • mtime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode最后修改时间
  • dataVersion: 表示对该znode数据所做的更改次数
  • cVersion: 表示对改znode的子节点进行的更改次数
  • aclVersion: 表示对znode的ACL(访问权限列表)进行更改的次数
  • emphemeralOwner: 如果znode是临时节点,则表示znode所有者的session ID; 如果改节点不是临时节点,则该值为0
  • dataLength: znode数据的长度
  • numChildren: 表示znode子节点的数量

Zxid

类似于RDBMS中的事务ID,用于标识一次更新操作的Proposal ID(提议ID)。为了保证顺序性,该zxid必须单调递增。因此ZooKeeper使用一个64位的数来标识,高32位是leader的epoch(Leader对应的年号), 从1开始,每次选出新的Leader, epoch加1, 低32位为该epoch的序号, 每次epoch变化都将低32位的序号重置,这样就保证了Zxid的全局递增性。

Watch

一个zk的节点可以被监控,包括这个目录中存储的数据的修改,一旦变化可以通知设置监控的客户端,这个特性是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理、集群管理和分布式锁等等。

watch机制的官方说明:一个Watch事件是一个一次性的触发器,当被设置了watch的数据发生改变的时候,则服务器将这个改变发送给设置了watch的客户端,以便通知他们。

可以注册watcher的方法:getData、exists、getChildren

可以触发watcher的方法:create、delete、setData。连接断开的情况下出发的watcher会丢失。

一个Watcher实例是一个回调函数,被回调一次就被移除了,如果还需要关注数据的变化,需要再次注册watcher

New一个ZooKeeper时注册的watcher叫default watcher, 它不是一次性的,只对client的连接状态变化作出反应

exists和setData设置数据监听, 而getChildren设置子节点监听

ZkCli常用命令

为了更好的使用zookeeper发挥分布式协调器的作用,就不得不学习它的命令行接口

进入zookeeper的安装目录,执行如下命令中的任意一条命令连接一个客户端

代码语言:javascript复制
# 使用本地默认2181端口连接
./bin/zkCli.sh 
# 连接远程zk服务,连接超时时间设置为3s
./bin/zkCli.sh -timeout 3000 -server <remoteIP>:2181
# 使用-waitforconnection选项连接远程zk服务
./bin/zkCli.sh -waitforconnection -timeout 3000 -server <remoteIP>:2181
# 使用自定义客户端配置属性文件连接远程zk服务
bin/zkCli.sh -client-configuration /path/to/client.properties

在连接的zk客户端控制台中执行 help 命令,就可以看到zk客户端控制台打印出了一系列的zk的命令行用法示例

代码语言:javascript复制
ZooKeeper -server host:port cmd args
	addauth scheme auth
	close
	config [-c] [-w] [-s]
	connect host:port
	create [-s] [-e] [-c] [-t ttl] path [data] [acl]
	delete [-v version] path
	deleteall path
	delquota [-n|-b|-N|-B] path
	get [-s] [-w] path
	getAcl [-s] path
	getAllChildrenNumber path
	getEphemerals path
	history
	listquota path
	ls [-s] [-w] [-R] path
	printwatches on|off
	quit
	reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
	redo cmdno
	removewatches path [-c|-d|-a] [-l]
	set [-s] [-v version] path data
	setAcl [-s] [-v version] [-R] path acl
	setquota -n|-b|-N|-B val path
	stat [-w] path
	sync path
	version

创建节点(znode)

用给定的路径创建一个节点,flag参数用于指定创建的节点是临时的还是顺序的。默认情况下,所有的节点都是持久的。当会话过期或客户端断开连接,临时节点(flag: -e)将被自动删除

顺序节点保证节点路径将是唯一的,ZooKeeper集合将向节点路径填充10位序列号。例如节点路径 /myApp 将转换位 /myApp/0000000001 , 下一个序号将是 /myApp/0000000002 。如果没有指定flag,则节点将被认为是持久的。

语法:

create /path data

示例:

create /firstNode first

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 24] create /firstNode first
Created /firstNode

要创建顺序节点,请添加flag: -s

语法:

create -s /path data

示例:

create -s /firstNode second

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 26] create -s /firstNode second
Created /firstNode0000000007

要创建临时节点,请添加flag: -e

语法:

create -e /path data

示例:

create -e /firstNode-ephemral ephemeral

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 27] create -e /firstNode-ephemral  ephemeral 
Created /firstNode-ephemral

记住客户端断开连接时,临时节点将被删除,你可以通过退出zkCli, 然后重新连接zkCli来查看效果

获取数据

通过get命令获取指定路径节点下的数据

语法:

get /path

示例:

get /firstNode

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 28] get /firstNode
first

要访问顺序节点,必须输入znode的完整的路径

示例:

get /firstNode0000000007

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 30] get /firstNode0000000007
second

设置数据

通过set命令设置指定znode的数据, 完成设置数据后,你可以通过get命令来检查数据

语法:

set /path data

示例:

set /firstNode first_update

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 31] set /firstNode first_update

通过执行 get /firstNode命令查看效果

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 32] get /firstNode
first_update

创建子节点

创建子节点类似于创建新的znode,唯一的区别是子节点的znode具有父路径

语法:

create /paren/path/subNode data

示例:

create /firstNode/child1 child1Data

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 1] create /firstNode/child1 child1Data
Created /firstNode/child1
[zk: localhost:2181(CONNECTED) 2] create /firstNode/child2 child2Data
Created /firstNode/child2

列出子节点:

此命令用于列出和显示znode的子项:

用法:

ls /path

示例:

ls /firstNode

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 3] ls /firstNode
[child1, child2]

检查状态

状态描述指定znode的元数据,它包含时间戳、版本号、数据长度和子znode等细项

语法:

stat /path

示例:

stat /firstNode

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 4] stat /firstNode
cZxid = 0x10000003c
ctime = Mon Jul 18 01:01:18 CST 2022
mZxid = 0x10000003f
mtime = Mon Jul 18 01:22:45 CST 2022
pZxid = 0x100000045
cversion = 2
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 2

删除znode

删除没有子节点的znode使用 delete /path 命令;

删除指定znode节点并递归删除其子节点使用deleteall /path命令

用法:

代码语言:javascript复制
delete /firstNode/child1
deleteall /firstNode

输出:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 5] delete /firstNode/child1
[zk: localhost:2181(CONNECTED) 6] deleteall /firstNode
[zk: localhost:2181(CONNECTED) 7] ls /firstNode
Node does not exist: /firstNode

ACL

ZK作为分布式应用架构中的重要中间件,通常会在上面以节点的方式存储一些关键信息。默认情况下,所有应用都可以读写任何节点,在复杂应用中这不太安全 ,ZK通过ACL机制来解决访问权限问题

  • ZooKeeper的权限控制是基于每个znode节点的,需要对每个znode设置权限
  • 每个znode支持设置多种权限控制方案和多个权限
  • 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

ACL权限控制使用 schema: id:permission 来标识, 注意涵盖3个方面

  • 权限模式(Schema): 鉴权的策略
  • 授权对象(ID)
  • 权限:Permission

Schema

  • world:只有一个用户anyone, 代表所有人
  • ip: 使用ip地址认证
  • auth: 使用已添加认证的用户认证
  • digest: 使用"用户名: 密码"方式认证

id

授权对象是指权限赋予的用户或者实体,例如ip地址或者机器。授权模式schema与授权对象id之间的关系:

  • world: 只有一个Id, 即anyone
  • ip: 通常是一个ip地址或者一个ip地址段,比如192.168.0.110或192.168.0.1/24
  • auth: 用户名
  • digest: 自定义,通常是“username: BASE64(SHA-1(username:password))"

权限

  • CREATE: 简写为c, 可以创建子节点
  • DELETE: 简写成d, 可以删除子节点(仅下一级), 注意不是本节点
  • READ:简写成r, 可以读取节点数据并显示子节点列表
  • WRITE: 简写成w, 可以设置节点数据
  • ADMIN: 简写成a, 可以设置节点访问控制列表

查看ACL

用法:

getAcl /path

示例:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 12] getAcl /firstNode
'world,'anyone
: cdrwa

可见一个znode节点默认是wold模式,任意用户都具备节点的所有操作权限

设置ACL

设置节点对所有用户都有写和管理权限

用法:

setAcl /path world:anyone wa

示例:

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 17] setAcl /persistent_node world:anyone:wa
[zk: localhost:2181(CONNECTED) 18] get /persistent_node
Insufficient permission : /persistent_node

当我们对persistent_node节点设置的权限列表没有READ权限时,执行get /persistent_node命令就会提示权限不足

digest授权模式

先添加用户:

addauth digest zhangsan:123456

再设置权限,这个节点只有zhangsan 这个用户对这个节点拥有所有权限

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 22] addauth digest zhangsan:123456
[zk: localhost:2181(CONNECTED) 23] setAcl /persistent_node auth:zhangsan:123456:rdwca

通过执行getAcl /persistent_node指令可以获取到该节点的ACL

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 24] getAcl /persistent_node
'diest,'zhangsan:jA/7JI9gsuLp0ZQn5J5dcnDQkHA=
: cdrwa
'digest,'zhangsan[zk:QalD ltUgPWxu8Pd gTLjlZ6aoY=
: cdrwa

Curator客户端

Curator客户端简介

Curator 框架是一个大幅度简化使用ZooKeeper的高水平ZooKeeper客户端API框架,它在ZooKeeper的基础上增加了很多新的特性,同时能够处理复杂的管理ZooKeeper集群连接和重试操作

Curator框架由以下几个子模块组成,它们的Maven坐标都已发布到Maven中央仓库,这些jar包的groupId、artifactId已经功能描述如下:

Curator的一些特性如下:

  • 自动连接管理:
    • 在使用ZooKeeper的时候存在一些潜在的错误场景需要ZooKeeper客户端重建连接或者重试操作,而Curator能够自动并透明地处理了这些错误场景
    • 在必要的时候监视NodeDataChanged事件并调用updateServerList() 方法
    • 通过Curator recipes自动移除监视器
  • 干净的API
    • 简化原生ZooKeeper的方法和事件使用API
    • 提供更简便的用法和流畅的接口
  • 丰富的方法实现
    • Leader选举
    • 共享锁
    • 路径缓存和监视器
    • 分布式队列
    • 分布式优先队列

创建一个CuratorFramework实例

CuratorFramework`类是curator框架中与ZooKeeper通信的客户端类

CuratorFramework实例可以通过提供了两个重载的静态newClient方法和一个静态builder方法创建,三个方法如下:

代码语言:javascript复制
/**
* 提供两个构造参数的创建CuratorFramework实例方法
* @param connectString 连接zookeeper服务的连接串
* @param retryPolicy 重试策略
*/
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
        return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
    }
/**
* 提供四个构造参数的创建CuratorFramework实例方法
* @param connectString 连接zookeeper服务的连接串
* @param sessionTimeoutMs会话超时时间(单位:ms)
* @param connectionTimeoutMs 连接超时时间 (单位:ms)
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
        return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
    }

/**
* 构建器方法,该方法返回一个CuratorFramework实例的构建器
* 通过静态内部类Builder的build方法创建CuratorFramework实例
*/
public  static CuratorFrameworkFactory.Builder builder() {
        return new CuratorFrameworkFactory.Builder();
    }
public  static  class Builder {
    public CuratorFramework build() {
            return new CuratorFrameworkImpl(this);
        }
}

以上CuratorFrameworkFactory#newClient工厂方法提供了一种简化创建CuratorFramework实例的方式,通过Builder可以修改CuratorFramework实例的所有参数。一旦你有了一个CuratorFramework实例,你必须调用其start()方法用来启动客户端实例,同时在应用结束的时候调用CuratorFramework类的close方法。

CuratorFramework API

CuratorFramework类使用流畅风格的接口,建议操作的时候使用返回一个CuratorFramework实例的构建者Builder, 用法如下:

代码语言:javascript复制
// 创建持久节点
client.create().forPath("/head", new byte[0]);
// 删除节点
client.delete().inBackground().forPath("/head");
// 创建临时有序节点
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
// 设置监听器
client.getData().watched().inBackground().forPath("/test");

CuratorFramework类的常用方法

以上方法描述中的方法解释:

  • withMode(CreateMode mode): 指定创建的节点类型
  • withTtl(long ttl): 指定临时节点失效时间
  • withVersion(int version): 指定版本号
  • inBackground: 异步处理错误,有6个重载方法,分别是:
    • inBackground(): 无参数方法,返回错误路径和字节数据对象ErrorListenerPathAndBytesable<String>
    • inBackground(BackgroundCallback callback, Object context): 入参带回调函数和上下文的异步处理错误,返回对象也是错误路径和字节数据
    • inBackground(Object context): 入参只有上下文的异步处理错误
    • inBackground(BackgroundCallback callback, Executor executor): 入参带回调函数和线程池的方法
    • inBackground(BackgroundCallback callback, Object context, Executor executor): 入参带回调函数、上下文和线程池的方法
    • inBackground(BackgroundCallback callback): 入参只带一个回调函数的方法
  • watch() : 添加监视器;
  • usingWatcher: 添加监视器有两个重载方法
    • usingWatcher(Watcher watcher): 指定Watcher类型监视器;
    • usingWatcher(CuratorWatcher watcher): 指定CuratorWatcher类型监视器
  • storingStatIn(Stat stat): 设置节点状态结构
  • withAcl(ListaclList): 设置访问权限列表
  • forPath: 对指定路径进行操作,有两个重载方法
    • forPath(String path): 操作指定路径,返回节点的Stat信息
    • forPath(String path, byte[] date), 对指定路径的节点设置数据, 返回节点的Stat信息
  • joining(Listserver): 设置zookeeper服务集群中的节点
  • leaving(Listserver): 移除zookeeper服务集群中的节点
  • withNewMembers(Listservers): 添加新的zookeeper服务节点
  • forEnsemble(): 组织成配置信息,返回byte[]类型数据,解码成字符串类型数据才是zookeeper服务集群的配置信息

Notifications

后台操作和监视的通知(notification)通过CuratorListener接口发布。您可以使用CuratorFramework实例的getCuratorListenable()的返回结果List<CuratorListener>实例的add(CuratorListener listener) 方法向CuratorFramework实例注册侦听器。监听器需要实现了下面这个抽象方法:

代码语言:javascript复制
 void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;

CuratorEvent

CuratorEvent对象是一个超设置的POJO,它可以保存各种类型的后台通知和触发的监视器。CuratorEvent的有用字段依赖于通过getType()方法获得的事件类型

CuratorEvent的事件类型和对应的触发方法如下:

命名空间

因为ZooKeeper集群是一个共享环境,所以观察名称空间约定非常重要,以便使用给定集群的各种应用程序不会使用冲突的ZK路径。

CuratorFramework有一个“名称空间”的概念。在创建CuratorFramework实例时设置名称空间(通过Builder)。CuratorFramework将在调用它的一个api时将名称空间前置到所有路径,如下所示:

代码语言:javascript复制
  CuratorFramework    client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
 ...
client.create().forPath("/test", data);
// znode 实际会写到 "/MyApp/test" 路径下

实现分布式配置中心

我们利用zookeeper的高度客户端封装框架CuratorFramwork及ZooKeeper的监听器机制实现一个分布式配置中心

引入Jar包依赖

代码语言:javascript复制
 <dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>4.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>4.0.0</version>
		</dependency> 

curator的最新版本为5.3.0, 但是在5.3.0版本中TreeCache类已经过时,这里我们使用的4.0.0版本连接3.7版本的ZooKeeper已经足够了

配置文件中添加zookeeper集群服务连接串

application.properties

代码语言:javascript复制
  zookeeper.connect.url=119.29.117.19:2181,119.29.117.19:2182,119.29.117.19:2183

项目中新建ZooKeeperConfig类

代码语言:javascript复制
  package org.sang.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.RetryOneTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;

@Component
public  class ZooKeeperConfig {

    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConfig.class);
    @Value("${zookeeper.connect.url}")
    private String zooKeeperConnectUrl;

    private Properties configProperties = new Properties();

    public String getProperty(String key){
        return configProperties.getProperty(key);
    }

    // 项目启动后自动拉取ZooKeeper中路径为"/config"的节点下的子节点配置项
    @PostConstruct
    public void init() throws Exception {
        // 连接zookeeper
        CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zooKeeperConnectUrl, new RetryOneTime(1000));
        zkClient.start();
        List<String> configNames = zkClient.getChildren().forPath("/config");
        for(String key: configNames){
            // 获取每个路径下的值(即配置值)
            byte[] value = zkClient.getData().forPath("/config/" key);
            configProperties.put(key, new String(value, "UTF-8"));
        }
        //保证实时性,利用zk的watch机制
        TreeCache treeCache = new TreeCache(zkClient, "/config");
        treeCache.start();
        // 创建监听器
        treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
            switch (treeCacheEvent.getType()){
                case NODE_UPDATED:
                    // 获取变更节点的路径名
                    String configName = treeCacheEvent.getData().getPath().replace("/config/", "");
                    // 监听到zk的zNode发生了数据变更
                    logger.info(configName   "的值发生了更新, 更新后的值为:"   new String(treeCacheEvent.getData().getData(), "UTF-8"));
                    // 获取变更的值
                    String configValue = new String(treeCacheEvent.getData().getData(), "UTF-8");
                    configProperties.put(configName, configValue);
                    break;
                default:
                    break;
            }
        });
    }

}

这里使用TreeCache#getListenable方法返回对象添加监听器,是因为CuratorFramework#getListenable方法返回对象添加监听器要更简单,无需显示地调用watchesusingWatcher方法。事件类型也只需要判断是否为节点变更类型即可。

效果测试

我们首先启动zookeeper服务集群,然后执行命令行zkCli.sh -server localhost:2181连接ZooKeeper服务

先在客户端执行创建znode节点的命令:

代码语言:javascript复制
  [zk: localhost:2181(CONNECTED) 1] create /config/key1 valu1
Created /config/key1
[zk: localhost:2181(CONNECTED) 2] create /config/key2 valu2
Created /config/key2
[zk: localhost:2181(CONNECTED) 3] create /config/key3 value3
Created /config/key3

然后可以在spring-boot项目的控制台看到三行与之对应的配置变量发生变化的日志:

代码语言:javascript复制
  2022-07-21 16:30:54.450  INFO 8856 --- [tor-TreeCache-0] org.sang.config.ZooKeeperConfig          : key1的值发生了更新, 更新后的值为:eurekaService
2022-07-21 16:32:59.019  INFO 8856 --- [tor-TreeCache-0] org.sang.config.ZooKeeperConfig          : key2的值发生了更新, 更新后的值为:consulService
2022-07-21 16:34:38.300  INFO 8856 --- [tor-TreeCache-0] org.sang.config.ZooKeeperConfig          : key3的值发生了更新, 更新后的值为:nacosService

这样我们需要的配置变量就可以通过调用ZooKeeperConfig#getProperty(String key)方法获取,前端展示的配置变量值可以由后端通过websocket推送给前端同步更新

小结

本文重点介绍了ZooKeeper的znode stat数据、监视器、常用命令以及Zookeeper的客户端高度封装框架CuratorFramwork及其常用API,并演示了使用CuratorFramwork API 实现一个分布式注册中心,利用ZooKeeper的监视器机制实现客户端更新配置变量时同步修改配置中心配置键对应的值。下篇文章笔者将带领大家继续学习使用CuratorFramwork框架实现一些增删改查和异步操作,乃至实现分布式事务锁功能。

原创不易,还请看到这里的小伙伴们点个赞和【在看】,笔者不胜感激!

参考阅读

【1】https://curator.apache.org/curator-framework/index.html

【2】 https://blog.csdn.net/chenzihao36/article/details

/106750057

---END---

0 人点赞