Zookeeper入门(三)—使用CuratorFramework操作节点并添加监视器

2022-11-22 13:37:43 浏览数 (1)

前言

在上一篇文章ZooKeeper入门(二)中笔者讲解了分布式协调中间件ZooKeeper的常用命令并使用Curator客户端实现了一个简单的配置中心功能。本文的目的就是带领读者朋友们一起学习如何在SpringBoot项目中使用Curator客户端对ZooKeeper节点进行简单的增删改查并对节点设置Watcher监视器等实践,让大家掌握使用Curator客户端对ZooKeeper进行基础的操作。

升级Curator版本

因为与我们使用的3.7.1版本的ZooKeeper对应的Curator客户端已升级到5.3.0版本,而且具备了幂等操作API,因此笔者也对Curator的版本由之前的4.0版本升级到了5.3.0版本

代码语言:javascript复制
  <dependency>
		<groupId>org.apache.curator</groupId>
		<artifactId>curator-framework</artifactId>
		<version>5.3.0</version>
  </dependency>
  <dependency>
		<groupId>org.apache.curator</groupId>
		<artifactId>curator-recipes</artifactId>
		<version>5.3.0</version>
	</dependency>

升级后的TreeCache类已过时, 其官方Java API文档中提示我们已使用CuratorCache类代替了TreeCache

因此,我们需要对之前项目中的ZooKeeperConfig类进行修改,鉴于CuratorFramwork类实例作为客户端工具

在对ZooKeeper节点进行操作时需要经常用到,因此我们把他注册到Spring 的IOC容器使其成为一个bean

  • 首先新建一个ZooKeeperClientConfig类,实例化CuratorFramwork bean
代码语言:javascript复制
package org.sang.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZooKeeperClientConfig {

    /**
     * 多参数构建ZooKeeper客户端连接
     * @return client
     */
    @Bean(name="zookeeperClient")
    public CuratorFramework createWithOptions(){
        // 连接串也可以从配置文件中取
        String connectString = "119.29.117.19:2181,119.29.117.19:2182,119.29.117.19:2183";
        ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .retryPolicy(backoffRetry)
                .sessionTimeoutMs(30*60*1000)   // 会话超时30分钟
                .connectionTimeoutMs(30*1000)   // 连接超时30s
                .build();
        client.start(); // 初始化后启动
        return client;
    }

}
  1. ZooKeeperConfig类中注入CuratorFramework bean 并使用CuratorCache类替换过时的TreeCache
代码语言:javascript复制
@Component
public class ZooKeeperConfig {

    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConfig.class);
    @Resource
    private CuratorFramework zkClient;

    private Properties configProperties = new Properties();


    public String getProperty(String key){
        return configProperties.getProperty(key);
    }

    // 初始化
    @PostConstruct
    public void init() throws Exception {
        List<String> configNames = zkClient.getChildren().forPath("/config");
        for(String key: configNames){
            // 获取每个路径下的值(即配置值)
            byte[] value = zkClient.getData().forPath("/config/" key);
            configProperties.put(key, new String(value, "UTF-8"));
        }
        //保证实时性,利用zk的watch机制
        CuratorCache curatorCache = CuratorCache.build(zkClient, "/config");
        curatorCache.start();
        // 创建监听器
        curatorCache.listenable().addListener((type, oldData, data) -> {
            // oldData为修改前的数据;data为将要修改的新数据,类型均为ChildData
            switch (type){
                case NODE_CHANGED:
                    // 获取变更节点的路径名
                    String configName = data.getPath().replace("/config/", "");
                    // 监听到zk的zNode发生了数据变更
                    logger.info(configName   "的值发生了更新, 更新后的值为:"   new String(data.getData()));
                    // 获取变更的值
                    String configValue = new String(data.getData());
                    configProperties.put(configName, configValue);
                    break;
                default:
                    break;
            }
        });
    }

}

ZooKeeper节点基础CRUD操作

新建ZooKeeperService服务类,在该类中完成对ZooKeeper节点的操作

代码语言:javascript复制
package org.sang.service;

import com.alibaba.fastjson.JSON;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;

@Service
public class ZooKeeperService {

    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperService.class);
    
    // 注入ZkClient bean
    @Resource
    private CuratorFramework curatorFramework;

    /**
     * 创建永久节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void createNode(String path, String data) throws Exception{
        curatorFramework.create().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 创建临时节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void createEphemeralNode(String path, String data) throws Exception {
        curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 创建临时有序节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void crateEphemeralSequentialNode(String path, String data) throws Exception {
       curatorFramework.create()
               .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
               .forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 往节点种设置数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setData(String path, String data) throws Exception{
         curatorFramework.setData().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }
    
    /**
     * 异步修改数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setDataAsync(String path, String data) throws Exception{
        // 添加回调监听器, set数据成功后会对节点进行监听
        CuratorListener listener = (client, event) -> {
            Stat stat = event.getStat();
            logger.info("stat="   JSON.toJSONString(stat));
            CuratorEventType eventType = event.getType();
            logger.info("eventType=" eventType.name());
        };
        curatorFramework.getCuratorListenable().addListener(listener);
        curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }


    /**
     * 删除节点
     * @param path
     * @throws Exception
     */
    public void deleteData(String path) throws Exception{
        curatorFramework.delete().forPath(path);
    }

    /**
     * 安全删除节点
     * @param path
     * @throws Exception
     */
    public void guaranteedDeleteData(String path) throws Exception {
        curatorFramework.delete().guaranteed().forPath(path);
    }
    
     /**
     * 获取子节点下的全部子节点路径集合
     * @param path 指定节点路径
     * @return List<String> 子节点路径集合
     * @throws Exception
     */
    public List<String> watchedGetChildren(String path) throws Exception {
        List<String> children = curatorFramework.getChildren().watched().forPath(path);
        return children;
    }

  
   /**
     * 获取节点数据
     * @param path 节点路径
     * @param fullClassName 数据转换对象全类名
     * @return Object
     * @throws Exception
     */
    public Object getDataByPath(String path, String fullClassName) throws Exception {
        String jsonStr = new String(curatorFramework.getData().forPath(path), StandardCharsets.UTF_8);
        Class clazz = Class.forName(fullClassName);
        return JSON.parseObject(jsonStr, clazz);
    }

}

  1. 新建ZookeepeController类, 通过接口操作ZookeeperService
代码语言:javascript复制
package org.sang.controller;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.sang.config.ZooKeeperConfig;
import org.sang.pojo.RespBean;
import org.sang.service.ZooKeeperService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/zookeeper")
public class ZooKeeperController {

    @Resource
    private ZooKeeperService zooKeeperService;
    @Resource
    private ZooKeeperConfig zooKeeperConfig;

    private final static Logger logger = LoggerFactory.getLogger(ZooKeeperController.class);
    
    /**
    * 获取配置变量接口
    */
    @GetMapping("/getConfigValueByKey")
    public RespBean<String> getConfigValueByKey(@RequestParam("configKey") String configKey){
        logger.info("configKey={}", configKey);
        String configValue = zooKeeperConfig.getProperty(configKey);
        RespBean<String> respBean = RespBean.success(configValue);
        return respBean;
    }
    
    /**
    * 创建持久节点接口
    */
    @PostMapping(value = "/create/persistent")
    public RespBean<String> createPersistentNode(@RequestBody Map<String, Object> postData) {
        RespBean respBean;
        try {
            checkPostData(postData);
            String path = (String) postData.get("path");
            Object data = postData.get("data");
            zooKeeperService.createNode(path, JSON.toJSONString(data));
            respBean = RespBean.success("create node "   path   " success");
        } catch (Exception e) {
            respBean = RespBean.error("create node failed");
            logger.error("create node error", e);
        }
        return respBean;
    }
    
    /**
    * 创建临时节点接口
    */
    @RequestMapping("/create/ephemeral")
    public  RespBean<String> createTempNode(@RequestBody Map<String, Object> postData) {
        RespBean<String> respBean;
        try {
                checkPostData(postData);
                String path = (String) postData.get("path");
                Object data = postData.get("data");
                zooKeeperService.createEphemeralNode(path, JSON.toJSONString(data));
                respBean = RespBean.success("create ephemeral node "   path   " success");
        } catch (Exception e) {
                respBean = RespBean.error("create ephemeral node failed");
                logger.error("create ephemeral node error", e);
        }
        return respBean;
    }
    
    /**
    * 创建临时有序节点接口
    */
    @PostMapping("/ephemeral/sequence")
    public RespBean<String> createEphemeralSequenceNode(@RequestBody Map<String, Object> postData){
        RespBean respBean;
        try{
            checkPostData(postData);
            String path = (String) postData.get("path");
            Object data = postData.get("data");
            zooKeeperService.crateEphemeralSequentialNode(path, JSON.toJSONString(data));
            respBean = RespBean.success("create ephemeral sequence node "   postData.get("path")   " success");
        } catch (Exception e) {
            respBean = RespBean.error("create ephemeral node failed");
            logger.error("create ephemeral sequence node error", e);
        }
        return respBean;
    }
    
    /**
    * 根据节点路径获取节点中的数据接口
    */
    @PostMapping("getDataByPath")
    public RespBean<Object> getDataByPath(@RequestBody Map<String, String> paramMap){
        RespBean respBean;
        try {
            String path = paramMap.get("path");
            String fullClassName = paramMap.get("fullClassName");
            Object data = zooKeeperService.getDataByPath(path, fullClassName);
            respBean = RespBean.success(data);
        } catch (Exception e) {
            respBean = RespBean.error("get data failed caused by "   e.getMessage());
            logger.error("get data error", e);
        }
        return respBean;
    }
    
    /**
    * 同步修改节点数据接口
    */
    @PostMapping("/setData/sync")
    public RespBean<String> setData(@RequestBody Map<String, Object> paramMap){
        checkPostData(paramMap);
        RespBean<String> respBean;
        try {
            zooKeeperService.setData((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
            respBean = RespBean.success("set data success");
        } catch (Exception e) {
            logger.error("set data failed", e);
            respBean = RespBean.error("set data failed, caused by "   e.getMessage());
        }
        return respBean;
    }

    @PostMapping("/setData/async")
    public RespBean<String> asyncSetData(@RequestBody Map<String, Object> paramMap){
        checkPostData(paramMap);
        RespBean<String> respBean;
        try {
            zooKeeperService.setDataAsync((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
            respBean = RespBean.success("async set data success");
        } catch (Exception e) {
            logger.error("async set data failed", e);
            respBean = RespBean.error("async set data failed, caused by "   e.getMessage());
        }
        return respBean;
    }

    /**
    * 获取被监听的子节点路径集合接口
    */
    @GetMapping("/getWatchedChildren")
    public RespBean<List<String>> getWatchedChildren(@RequestParam("path") String path){
        RespBean<List<String>> respBean;
        try {
            List<String> watchedChildren = zooKeeperService.watchedGetChildren(path);
            respBean = RespBean.success(watchedChildren);
        } catch (Exception e) {
            logger.error("getWatchedChildren error", e);
            respBean = RespBean.error("getWatchedChildren failed, caused by "   e.getMessage());
        }
        return respBean;
    }
    
    /**
    * 删除节点接口
    */
    @DeleteMapping("/deleteByPath")
    public RespBean<String> deleteDataByPath(@RequestParam("path") String path){
        logger.info("delete ZNode "   path);
        RespBean<String> respBean;
        try {
            zooKeeperService.deleteData(path);
            respBean = RespBean.success("delete ZNode success");
        } catch (Exception e) {
            logger.error("delete ZNode of "   path   "failed", e);
            respBean = RespBean.error("delete ZNode failed, caused by "   e.getMessage());
        }
        return respBean;
    }
    
    /**
    * 安全删除节点接口
    */
    @DeleteMapping("/guaranteedDeleteData")
    public RespBean<String> guaranteedDeleteData(@RequestParam("path") String path){
        logger.info("guaranteed delete ZNode "   path);
        RespBean<String> respBean;
        try {
            zooKeeperService.guaranteedDeleteData(path);
            respBean = RespBean.success("guaranteed delete data success");
        } catch (Exception e) {
            logger.error("guaranteed delete data failed", e);
            respBean = RespBean.error("guaranteed delete data failed, caused by "   e.getMessage());
        }
        return respBean;
    }

    /**
    * 校验POST请求入参数据
    */
    private void checkPostData(Map<String, Object> postData){
        String path = (String) postData.get("path");
        if(StringUtils.isEmpty(path)){
            throw new IllegalArgumentException("path cannot be null");
        }
        Object data = postData.get("data");
        if(data==null || "".equals(data)){
            throw new IllegalArgumentException("data cannot be null");
        }
    }

}

测试CRUD基础操作

首先我们在项目的SpringSecurity配置文件WebSecurityConfig.java中对操作zookeeper的接口放开认证要求

SpringSecurity#configure(HttpSecurity http)方法

代码语言:javascript复制
 http.authorizeRequests()
                .antMatchers("/user/reg").anonymous()
                .antMatchers("/zookeeper/**").anonymous()

参考笔者之前发布的文章Zookeeepr入门(一)启动ZooKeeper集群服务,然后启动本地的Redis和MySql服务后

再在IDEA中启动blogserver服务,服务启动成功之后就可以通过在postman中调用接口进行验证了

创建持久节点

在postman中调用创建持久节点接口

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/create/persistent
{
    "path": "/test",
	"data": {
		"serviceName": "zooKeeperService",
		"serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
        "contextPath": "/zooKeeper",
        "apiList": []
	}
}

接口返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "create node /test success"
}

然后更换入参调用同一个接口创建子节点/test/oderService

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/create/persistent
{
    "path": "/test/oderService",
		"data": {
		"serviceName": "orderService",
		"serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
        "contextPath": "/orderService",
            "apiList": [
                {
                    "apiId": 1,
                    "apiPath": "/order/create",
                    "apiName": "创建订单接口API",
                    "requestType": "POST",
                    "argumentTypes": "java.lang.String, java.lang.Double",
                     "returnType": "org.sang.pojo.RespBean"
                },
                {
                    "apiId": 2,
                    "apiPath": "/order/get",
                    "apiName": "查询订单接口API",
                    "requestType": "GET",
                    "argumentTypes": "java.lang.Long",
                     "returnType": "org.sang.pojo.RespBean"
                }
            ]
	}
}

注意:为了创建路径为/test/orderService的持久节点成功,必须先创建/test节点,否则在没有父节点的情况下直接创建/test`的子节点zookeeper客户端会会报错

此时我们在Linux客户端连接ZooKeeper服务, 通过ls /test命令可查看到/test路径下的所有子节点

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 19] ls /test
[orderService]

然后再通过get /test/orderService命令可以直接查看该节点下的数据

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 21] get /test/orderService
{"serviceName":"orderService","serverAddress":"192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080","contextPath":"/orderService","apiList":[{"apiId":1,"apiPath":"/order/create","apiName":"创建订单接口API","requestType":"POST","argumentTypes":"java.lang.String, java.lang.Double","returnType":"org.sang.pojo.RespBean"},{"apiId":2,"apiPath":"/order/get","apiName":"查询订单接口API","requestType":"GET","argumentTypes":"java.lang.Long","returnType":"org.sang.pojo.RespBean"}]}
  1. 查看节点数据

为了将/test节点及其子节点中存放的数据在取数据时能反序列化为一个对象,我们新建了一两个实体类

ServiceInfo.javaApiInfo.java

代码语言:javascript复制
public class ServiceInfo implements Serializable {
    // 服务名称
    private String serviceName;
    // 服务地址,IP 端口号,多个使用逗号分隔
    private String serverAddress;
    // 上下文
    private String contextPath;
    // 服务中的api列表
    private List<ApiInfo> apiList;
    // 省略setter和getter方法
}
代码语言:javascript复制
public class ApiInfo implements Serializable {
    // APIID
    private Long apiId;
    // API 路径
    private String apiPath;
    // API名称
    private String apiName;
    // 请求类型:GET|POST|PUT|DELETE
    private String requestType;
    // 参数类型,参数全类名, 多个以逗号分隔
    private String argumentTypes;
    // 返回值类型
    private String returnType;
     // 省略setter和getter方法
}

在postman中调用查询节点数据接口

代码语言:javascript复制
  POST http://localhost:8081/blog/zookeeper/getDataByPath
  {
  	"path": "/test",
  	"fullClassName": "org.sang.pojo.ServiceInfo"
  }
  // 第一参数为节点路径,第二个参数为实体类全类名

接口返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": {
        "serviceName": "zooKeeperService",
        "serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
        "contextPath": "/zooKeeper",
        "apiList": []
    }
}

再次调用相同接口获取/test/orderService节点中的数据

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/getDataByPath
{
	"path": "/test/orderService",
	"fullClassName": "org.sang.pojo.ServiceInfo"
}

接口返回结果:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": {
        "serviceName": "orderService",
        "serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
        "contextPath": "/orderService",
        "apiList": [
            {
                "apiId": 1,
                "apiPath": "/order/create",
                "apiName": "创建订单接口API",
                "requestType": "POST",
                "argumentTypes": "java.lang.String, java.lang.Double",
                "returnType": "org.sang.pojo.RespBean"
            },
            {
                "apiId": 2,
                "apiPath": "/order/get",
                "apiName": "查询订单接口API",
                "requestType": "GET",
                "argumentTypes": "java.lang.Long",
                "returnType": "org.sang.pojo.RespBean"
            }
        ]
    }
}

创建临时节点

调用创建临时节点接口

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/create/ephemeral
{
    "path": "/test/ephemralOne",
	"data": "ephemeral node1"
}

接口返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "create ephemeral node /test/ephemralOne success"
}

然后在连接ZooKeeper服务的Linux客户端中执行命令ls /test 查看临时节点

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 22] ls /test
[ephemralOne, orderService]

我们发现/test节点下多了一个子节点ephemralOne

然后重启blogserver服务后我们再次执行ls /test命令会发现ephemralOne子节点消失不见了,证明了它是 一个临时节点,在会话关闭后就会消失。

创建临时有序节点

调用创建临时有序节点接口

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/ephemeral/sequence
{
	"path": "/test/ephemeralSequence",
	"data": "ephemeralSequence"
}

连续调用以上接口三次

每次调用都会返回以下响应信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "create ephemeral sequence node /test/ephemeralSequence success"
}

再次执行ls /test命令查看/test节点下的子节点

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 23] ls /test
[ephemeralSequence0000000004, ephemeralSequence0000000005, ephemeralSequence0000000006, orderService]

可以看到创建的临时有序节点在指定的路径名ephemeralSequence后面都带有一个十位长度的数字字符串

在获取节点路径时需要将后面的数字字符也带上

代码语言:javascript复制
  [zk: localhost:2181(CONNECTED) 24] get /test/ephemeralSequence0000000004
  "ephemeralSequence"

重置节点数据

调用同步设置数据接口

代码语言:javascript复制
POST 
{
	"path": "/test",
	"data": {
		"serverAddress": "localhost:2181,localhost:2182,localhost:2183",
		"contextPath": "/cloudService",
		"serviceName": "registerService",
		"apiList":[]
	}
}

返回结果:

代码语言:javascript复制
  {
      "status": 200,
      "msg": "success",
      "data": "set data success"
  }

然后通过客户端zkCli命令可以查看到test节点数据发送了变化

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 0] get /test
{"serverAddress":"localhost:2181,localhost:2182,localhost:2183","contextPath":"/cloudService","serviceName":"registerService","apiList":[]}

删除节点

调用删除节点接口

代码语言:javascript复制
DELETE http://localhost:8081/blog/zookeeper/deleteByPath?path=/test/ephemralOne

返回结果

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "delete ZNode success"
}

注意DeleteBuilder#forPath方法只能删除没有子节点的节点,不能用来删除有子节点的节点

另一种删除方式curatorFramework.delete().guaranteed().forPath(path) 也只能删除子节点,这种方式是显示地表示可删除节点位子节点

ZooKeeper中的回调、监听器和Watcher

ZooKeeper中间件之所以能作为一个分布式协调器的一个重要原因就在于它的Watch机制, 当节点创建、修改、删除以及重连和连接失效时都能通过watch机制得到通知

通过回调监听

这种方式是通过设置BackgroundCallback回调函数监听节点

BackgroundCallback是一个接口

代码语言:javascript复制
public interface BackgroundCallback {
    void processResult(CuratorFramework zkClient, CuratorEvent event) throws Exception;
}

构造一个BackgroundCallback实例需要实现processResult抽象方法

第一个参数是CuratorFramework类型的zkClient, 通过这个客户端可以在回调种继续操作ZNode节点, 添加监视器等

第二个参数是CuratorEvent类型的事件对象, CuratorEvent也是一个接口,它的实现类是CuratorEventImpl

代码语言:javascript复制
public interface CuratorEvent {
    CuratorEventType getType();

    int getResultCode();

    String getPath();

    Object getContext();

    Stat getStat();

    byte[] getData();

    String getName();

    List<String> getChildren();

    List<ACL> getACLList();

    List<CuratorTransactionResult> getOpResults();

    WatchedEvent getWatchedEvent();
}

通过CuratorEvent#getType方法可以获得事件类型CuratorEventType,是一个枚举类型可以看到有下面这些事件类型

代码语言:javascript复制
public enum CuratorEventType {
    CREATE,  // 创建节点
    DELETE,  // 删除节点
    EXISTS,  // 存在节点
    GET_DATA,  // 获取节点数据
    SET_DATA,  // 重置节点数据
    CHILDREN,   // 子节点
    SYNC,   // 同步
    GET_ACL,  // 获取权限
    SET_ACL,  // 设置权限
    TRANSACTION,  // 事务
    GET_CONFIG, // 获取zookeeper配置信息
    RECONFIG,   // 重新配置zookeeper
    WATCHED,  // 监听
    REMOVE_WATCHES,  // 移除监听
    CLOSING,  会话关闭
    ADD_WATCH;  // 添加watch
    private CuratorEventType() {
    }
}

通过添加监听器监听

监听器为CuratorListener, 它是一个接口,有一个抽象方法

代码语言:javascript复制
 public interface CuratorListener {
      void eventReceived(CuratorFramework zkClient, CuratorEvent event) throws Exception;
   }

可以看到CuratorListenerBackgroundCallback两个接口具有相同类型的入参,可以说二者实现监视节点的效果和底层原理都是一样的

给节点添加Watcher

watcher是一个接口

代码语言:javascript复制
@Public
public interface Watcher {
    void process(WatchedEvent watchedEvent);
}

要构造一个Watcher实例需要实现process抽象方法,只有一个WatchedEvent类型的构造参数

代码语言:javascript复制
@Public
public class WatchedEvent {
    private final KeeperState keeperState;
    private final EventType eventType;
    private String path;
    // 三个参数的构造函数
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
        this.keeperState = keeperState;
        this.eventType = eventType;
        this.path = path;
    }
    // 一个参数的构造方法
    public WatchedEvent(WatcherEvent eventMessage) {
        this.keeperState = KeeperState.fromInt(eventMessage.getState());
        this.eventType = EventType.fromInt(eventMessage.getType());
        this.path = eventMessage.getPath();
    }
    
    public KeeperState getState() {
        return this.keeperState;
    }

    public EventType getType() {
        return this.eventType;
    }

    public String getPath() {
        return this.path;
    }

    public String toString() {
        return "WatchedEvent state:"   this.keeperState   " type:"   this.eventType   " path:"   this.path;
    }

    public WatcherEvent getWrapper() {
        return new WatcherEvent(this.eventType.getIntValue(), this.keeperState.getIntValue(), this.path);
    }
}

通过WatchedEvent#type方法可以获得事件类型参数EventType对象

EventType也是一个枚举类, 囊括了以下几种ZNode事件

代码语言:javascript复制
None(-1),  // 无事件
      NodeCreated(1),  // 创建节点
      NodeDeleted(2),  // 节点被删除
      NodeDataChanged(3),  // 节点发送改变
      NodeChildrenChanged(4), // 子节点发生改变
      DataWatchRemoved(5),  // 数据监视器被删除
      ChildWatchRemoved(6), // 子节点监视器被删除
      PersistentWatchRemoved(7);  // 持久化监视器被删除

Watcher内部具有EventTypeKeeperState两个枚举类

BackgroundCallback与CuratorListener的用法

ZooKeeeprService类中定义一个全局变量callback, 这个回调可以在异步创建节点、异步修改节点数据以及异步删除节点时对节点进行监听处理

代码语言:javascript复制
 // 定义回调函数,对节点进行监听
    private BackgroundCallback callback = ((zkClient, curatorEvent) -> {
        logger.info("event data="  ((curatorEvent.getData()==null)?"null data": new String(curatorEvent.getData())));
        switch (curatorEvent.getType()){
            case SET_DATA:
                // 只会触发SET_DATA事件
                logger.info("node data changed");
                // 回调做其他事情
                break;
            case CREATE:
                logger.info("node created");
                break;
            case CHILDREN:
                logger.info("children");
                break;
            case DELETE:
                logger.info("node deleted");
                break;
            default:
                logger.info("eventType=" curatorEvent.getName());
                break;
        }
    });
// 定义监听器,对节点进行监听
    private CuratorListener listener = (client, event) -> {
        Stat stat = event.getStat();
        logger.info("stat="   JSON.toJSONString(stat));
        CuratorEventType eventType = event.getType();
        logger.info("eventType=" eventType.name());
    };

这里我们只在回调里打印日志

然后在异步创建节点、异步修改节点数据及异步删除节点方法中使用

代码语言:javascript复制
 /**
     * 异步创建节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void asyncCreateNode(String path, String data) throws Exception {
        curatorFramework.create().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

/**
     * 异步修改节点数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setDataAsyncWithCallback(String path, String data) throws Exception {
        curatorFramework.setData().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 使用监听器异步修改数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setDataAsyncWithListener(String path, String data) throws Exception {
        // 通过监听器修节点改数据
        curatorFramework.getCuratorListenable().addListener(listener);
        curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }
/**
     * 异步删除数据
     * @param path
     * @throws Exception
     */
    public void asyncDeleteData(String path) throws Exception {
        curatorFramework.delete().inBackground(callback).forPath(path);
    }

然后在ZooKeeperController类中加上对应的控制器方法

代码语言:javascript复制
@PostMapping("/create/async")
    public RespBean<String> asyncCreateNode(@RequestBody Map<String, Object> postData){
        RespBean respBean;
        try {
            checkPostData(postData);
            String path = (String) postData.get("path");
            Object data = postData.get("data");
            zooKeeperService.asyncCreateNode(path, JSON.toJSONString(data));
            respBean = RespBean.success(" async create node "   path   " success");
        } catch (Exception e){
            respBean = RespBean.error("async create node failed");
            logger.error("async create node error", e);
        }
        return respBean;
    }

     @PostMapping("/setData/async/callback")
    public RespBean<String> setDataAsyncWithCallback(@RequestBody Map<String, Object> paramMap){
        checkPostData(paramMap);
        RespBean<String> respBean;
        try {
            zooKeeperService.setDataAsyncWithCallback((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
            respBean = RespBean.success("async set data with callback success");
        } catch (Exception e) {
            logger.error("set data with callback failed", e);
            respBean = RespBean.error("async set data with callback failed, caused by "   e.getMessage());
        }
        return respBean;
    }

@DeleteMapping("/deleteByPath/async")
    public RespBean<String> asyncDeleteByPath(@RequestParam("path") String path){
        logger.info(" async delete ZNode "   path);
        RespBean<String> respBean;
        try {
            zooKeeperService.asyncDeleteData(path);
            respBean = RespBean.success("delete ZNode success");
        } catch (Exception e) {
            logger.error("async delete ZNode of "   path   "failed", e);
            respBean = RespBean.error("async delete ZNode failed, caused by "   e.getMessage());
        }
        return respBean;
    }

然后重启服务开始测试效果

测试异步添加节点

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/create/async
{
	"path": "/test/person",
	"data": "personInfo collection"
}

返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": " async create node /test/person success"
}

可以看到控制台中打印出了节点创建的信息,这是我们在回调函数中根据事件类型判断打印的日志

代码语言:javascript复制
2022-08-21 13:44:02.629  INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data
2022-08-21 13:44:02.630  INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node created

测试使用回调异步修改节点数据接口

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/setData/async/callback
{
	"path": "/test/person",
	"data": "person infomation collection"
}

返回结果:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "async set data with callback success"
}

控制台打印如下日志:

代码语言:javascript复制
2022-08-21 22:55:02.011  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data
2022-08-21 22:55:02.012  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node data changed

测试使用监听器异步修改节点数据

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/setData/async/listener
{
	"path": "/test/person/2",
	"data": {
		"name": "李四",
		"age": 24,
		"height": 173.2,
		"salary": 15800.85
	}
}

返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "async set data with callback success"
}

控制台打印日志:

代码语言:javascript复制
2022-08-21 23:03:54.778  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : stat=null
2022-08-21 23:03:54.779  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : eventType=SET_DATA

测试异步删除节点

代码语言:javascript复制
DELETE http://localhost:8081/blog/zookeeper/deleteByPath/async?path=/test/person/2

返回结果:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "delete ZNode success"
}

控制台打印出日志:

代码语言:javascript复制
2022-08-21 23:09:06.306  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data2022-08-21 23:09:06.306  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node deleted

Watcher的用法

ZooKeeperService类里定义个全局的CuratorWatcher, 并定义一个为节点添加这个watcher的方法

代码语言:javascript复制
private CuratorWatcher watcher = watchedEvent -> {
        String eventName = watchedEvent.getType().name();
        // 监听的节点路径
        String watchedPath = watchedEvent.getPath();
        logger.info("watchedPath={}", watchedPath);
        switch (eventName){
            case "NodeCreated":
                logger.info("node created, add Lock success");
                break;
            case "NodeDeleted":
                logger.info("node deleted, release lock success");
                break;
            case "NodeDataChanged":
                logger.info("node data changed");
                break;
            case "NodeChildrenChanged":
                logger.info("node children changed");
                break;
            case "DataWatchRemoved":
                logger.info("data watcher removed");
                break;
            case "ChildWatchRemoved":
                logger.info("child watcher removed");
                break;
            case "PersistentWatchRemoved":
                logger.info("persistent watcher removed");
   
                
     /**
     * 给节点添加watcher
     * @param path
     * @throws Exception
     */
    public void addWatchByPath(String path) throws Exception {
        curatorFramework.getData().usingWatcher(watcher).forPath(path);
    }break;
            default:
                logger.info("none event");
                break;
        }
    };

/**
     * 给节点添加watcher
     * @param path
     * @throws Exception
     */
    public void addWatchByPath(String path) throws Exception {
        curatorFramework.getData().usingWatcher(watcher).forPath(path);
    }

然后再在ZooKeeperController类中添加对应的控制器方法

代码语言:javascript复制
@PostMapping("/addWatcherByPath")
public RespBean<String> addWatcherByPath(@RequestParam("path") String path){
        RespBean<String> respBean;
        try {
            zooKeeperService.addWatchByPath(path);
            respBean = RespBean.success("add watcher success");
        } catch (Exception e) {
            logger.info("add watcher failed", e);
            respBean = RespBean.error("add watcher failed, caused by "   e.getMessage());
        }
        return respBean;
 }

重启应用后对/test/person/1这个节点进行监听

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/addWatcherByPath?path=/test/person/1

返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "add watcher success"
}

然后调用同步setData接口对这个节点数据进行修改

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/setData/sync
{
	"path": "/test/person/1",
	"data": {
		"name": "王五",
		"age": 32,
		"height": 173.6,
		"salary": 16800.58
	}
}

返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "set data success"
}

控制台打印出日志:

代码语言:javascript复制
2022-08-21 23:29:19.349  INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService        : watchedPath=/test/person/1
2022-08-21 23:29:19.349  INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node data changed

注意: Watcher还会监听一次,后面继续对节点进行操作就不会进入CuratorWatcher#process方法,如果需要继续监视节点的变化,则需要重新对节点添加Watcher

我们来测试一下效果,继续修改/test/person/1节点

代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/setData/sync
{
	"path": "/test/person/1",
	"data": {
		"name": "赵六",
		"age": 33,
		"height": 174.6,
		"salary": 18800.58
	}
}

返回信息:

代码语言:javascript复制
{
    "status": 200,
    "msg": "success",
    "data": "set data success"
}

但是控制台却看不到我们定义的watcherprocess方法中打印的日志,说明被调用了一次的watcher已经失效

不过要给节点添加持久类型的Watcher可通过下面这种链式调用方式实现

代码语言:javascript复制
CuratorFramework.watchers()
    .add()
    .withMode(AddWatchMode.PERSISTENT)
    .usingWatcher(watcher)
    .forPath(path)

小结

本文主要详细讲解了使用CuratorFramework客户端在SpringBoot项目中对ZooKeeper节点实现增删改查以及对ZooKeeper节点添加BackgroundCallback回调、CuratorListener监听器和Watcher监视器等操作,既能实现对节点的异步操作,也能监听节点的变化。从而让我们并根据ZooKeeper节点事件类型作出响应的业务逻辑处理.

关于使用CuratorFramework客户端以非事务的方式操作ZooKeeper节点就介绍到这里,想要更深入的学习CuratorFramework的用法可通过阅读该类及其方法相关类的源码进一步掌握。下一篇文章,笔者将继续介绍

使用CuratorFramework客户端在一个事务中完成多个操作,并介绍使用ZooKeeper实现分布式事务锁。

0 人点赞