前言
在上一篇文章ZooKeeper入门(二)中笔者讲解了分布式协调中间件ZooKeeper的常用命令并使用Curator客户端实现了一个简单的配置中心功能。本文的目的就是带领读者朋友们一起学习如何在SpringBoot项目中使用Curator客户端对ZooKeeper节点进行简单的增删改查并对节点设置Watcher监视器等实践,让大家掌握使用Curator客户端对ZooKeeper进行基础的操作。
升级Curator版本
因为与我们使用的3.7.1版本的ZooKeeper对应的Curator客户端已升级到5.3.0版本,而且具备了幂等操作API,因此笔者也对Curator的版本由之前的4.0版本升级到了5.3.0版本
代码语言:javascript复制 <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.3.0</version>
</dependency>
升级后的TreeCache
类已过时, 其官方Java API文档中提示我们已使用CuratorCache
类代替了TreeCache
类
因此,我们需要对之前项目中的ZooKeeperConfig
类进行修改,鉴于CuratorFramwork
类实例作为客户端工具
在对ZooKeeper
节点进行操作时需要经常用到,因此我们把他注册到Spring
的IOC容器使其成为一个bean
- 首先新建一个
ZooKeeperClientConfig
类,实例化CuratorFramwork
bean
package org.sang.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZooKeeperClientConfig {
/**
* 多参数构建ZooKeeper客户端连接
* @return client
*/
@Bean(name="zookeeperClient")
public CuratorFramework createWithOptions(){
// 连接串也可以从配置文件中取
String connectString = "119.29.117.19:2181,119.29.117.19:2182,119.29.117.19:2183";
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(backoffRetry)
.sessionTimeoutMs(30*60*1000) // 会话超时30分钟
.connectionTimeoutMs(30*1000) // 连接超时30s
.build();
client.start(); // 初始化后启动
return client;
}
}
ZooKeeperConfig
类中注入CuratorFramework
bean 并使用CuratorCache
类替换过时的TreeCache
类
@Component
public class ZooKeeperConfig {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConfig.class);
@Resource
private CuratorFramework zkClient;
private Properties configProperties = new Properties();
public String getProperty(String key){
return configProperties.getProperty(key);
}
// 初始化
@PostConstruct
public void init() throws Exception {
List<String> configNames = zkClient.getChildren().forPath("/config");
for(String key: configNames){
// 获取每个路径下的值(即配置值)
byte[] value = zkClient.getData().forPath("/config/" key);
configProperties.put(key, new String(value, "UTF-8"));
}
//保证实时性,利用zk的watch机制
CuratorCache curatorCache = CuratorCache.build(zkClient, "/config");
curatorCache.start();
// 创建监听器
curatorCache.listenable().addListener((type, oldData, data) -> {
// oldData为修改前的数据;data为将要修改的新数据,类型均为ChildData
switch (type){
case NODE_CHANGED:
// 获取变更节点的路径名
String configName = data.getPath().replace("/config/", "");
// 监听到zk的zNode发生了数据变更
logger.info(configName "的值发生了更新, 更新后的值为:" new String(data.getData()));
// 获取变更的值
String configValue = new String(data.getData());
configProperties.put(configName, configValue);
break;
default:
break;
}
});
}
}
ZooKeeper节点基础CRUD操作
新建ZooKeeperService服务类,在该类中完成对ZooKeeper节点的操作
代码语言:javascript复制package org.sang.service;
import com.alibaba.fastjson.JSON;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Service
public class ZooKeeperService {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperService.class);
// 注入ZkClient bean
@Resource
private CuratorFramework curatorFramework;
/**
* 创建永久节点
* @param path
* @param data
* @throws Exception
*/
public void createNode(String path, String data) throws Exception{
curatorFramework.create().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 创建临时节点
* @param path
* @param data
* @throws Exception
*/
public void createEphemeralNode(String path, String data) throws Exception {
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 创建临时有序节点
* @param path
* @param data
* @throws Exception
*/
public void crateEphemeralSequentialNode(String path, String data) throws Exception {
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 往节点种设置数据
* @param path
* @param data
* @throws Exception
*/
public void setData(String path, String data) throws Exception{
curatorFramework.setData().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 异步修改数据
* @param path
* @param data
* @throws Exception
*/
public void setDataAsync(String path, String data) throws Exception{
// 添加回调监听器, set数据成功后会对节点进行监听
CuratorListener listener = (client, event) -> {
Stat stat = event.getStat();
logger.info("stat=" JSON.toJSONString(stat));
CuratorEventType eventType = event.getType();
logger.info("eventType=" eventType.name());
};
curatorFramework.getCuratorListenable().addListener(listener);
curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 删除节点
* @param path
* @throws Exception
*/
public void deleteData(String path) throws Exception{
curatorFramework.delete().forPath(path);
}
/**
* 安全删除节点
* @param path
* @throws Exception
*/
public void guaranteedDeleteData(String path) throws Exception {
curatorFramework.delete().guaranteed().forPath(path);
}
/**
* 获取子节点下的全部子节点路径集合
* @param path 指定节点路径
* @return List<String> 子节点路径集合
* @throws Exception
*/
public List<String> watchedGetChildren(String path) throws Exception {
List<String> children = curatorFramework.getChildren().watched().forPath(path);
return children;
}
/**
* 获取节点数据
* @param path 节点路径
* @param fullClassName 数据转换对象全类名
* @return Object
* @throws Exception
*/
public Object getDataByPath(String path, String fullClassName) throws Exception {
String jsonStr = new String(curatorFramework.getData().forPath(path), StandardCharsets.UTF_8);
Class clazz = Class.forName(fullClassName);
return JSON.parseObject(jsonStr, clazz);
}
}
- 新建
ZookeepeController
类, 通过接口操作ZookeeperService
类
package org.sang.controller;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.sang.config.ZooKeeperConfig;
import org.sang.pojo.RespBean;
import org.sang.service.ZooKeeperService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/zookeeper")
public class ZooKeeperController {
@Resource
private ZooKeeperService zooKeeperService;
@Resource
private ZooKeeperConfig zooKeeperConfig;
private final static Logger logger = LoggerFactory.getLogger(ZooKeeperController.class);
/**
* 获取配置变量接口
*/
@GetMapping("/getConfigValueByKey")
public RespBean<String> getConfigValueByKey(@RequestParam("configKey") String configKey){
logger.info("configKey={}", configKey);
String configValue = zooKeeperConfig.getProperty(configKey);
RespBean<String> respBean = RespBean.success(configValue);
return respBean;
}
/**
* 创建持久节点接口
*/
@PostMapping(value = "/create/persistent")
public RespBean<String> createPersistentNode(@RequestBody Map<String, Object> postData) {
RespBean respBean;
try {
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.createNode(path, JSON.toJSONString(data));
respBean = RespBean.success("create node " path " success");
} catch (Exception e) {
respBean = RespBean.error("create node failed");
logger.error("create node error", e);
}
return respBean;
}
/**
* 创建临时节点接口
*/
@RequestMapping("/create/ephemeral")
public RespBean<String> createTempNode(@RequestBody Map<String, Object> postData) {
RespBean<String> respBean;
try {
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.createEphemeralNode(path, JSON.toJSONString(data));
respBean = RespBean.success("create ephemeral node " path " success");
} catch (Exception e) {
respBean = RespBean.error("create ephemeral node failed");
logger.error("create ephemeral node error", e);
}
return respBean;
}
/**
* 创建临时有序节点接口
*/
@PostMapping("/ephemeral/sequence")
public RespBean<String> createEphemeralSequenceNode(@RequestBody Map<String, Object> postData){
RespBean respBean;
try{
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.crateEphemeralSequentialNode(path, JSON.toJSONString(data));
respBean = RespBean.success("create ephemeral sequence node " postData.get("path") " success");
} catch (Exception e) {
respBean = RespBean.error("create ephemeral node failed");
logger.error("create ephemeral sequence node error", e);
}
return respBean;
}
/**
* 根据节点路径获取节点中的数据接口
*/
@PostMapping("getDataByPath")
public RespBean<Object> getDataByPath(@RequestBody Map<String, String> paramMap){
RespBean respBean;
try {
String path = paramMap.get("path");
String fullClassName = paramMap.get("fullClassName");
Object data = zooKeeperService.getDataByPath(path, fullClassName);
respBean = RespBean.success(data);
} catch (Exception e) {
respBean = RespBean.error("get data failed caused by " e.getMessage());
logger.error("get data error", e);
}
return respBean;
}
/**
* 同步修改节点数据接口
*/
@PostMapping("/setData/sync")
public RespBean<String> setData(@RequestBody Map<String, Object> paramMap){
checkPostData(paramMap);
RespBean<String> respBean;
try {
zooKeeperService.setData((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
respBean = RespBean.success("set data success");
} catch (Exception e) {
logger.error("set data failed", e);
respBean = RespBean.error("set data failed, caused by " e.getMessage());
}
return respBean;
}
@PostMapping("/setData/async")
public RespBean<String> asyncSetData(@RequestBody Map<String, Object> paramMap){
checkPostData(paramMap);
RespBean<String> respBean;
try {
zooKeeperService.setDataAsync((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
respBean = RespBean.success("async set data success");
} catch (Exception e) {
logger.error("async set data failed", e);
respBean = RespBean.error("async set data failed, caused by " e.getMessage());
}
return respBean;
}
/**
* 获取被监听的子节点路径集合接口
*/
@GetMapping("/getWatchedChildren")
public RespBean<List<String>> getWatchedChildren(@RequestParam("path") String path){
RespBean<List<String>> respBean;
try {
List<String> watchedChildren = zooKeeperService.watchedGetChildren(path);
respBean = RespBean.success(watchedChildren);
} catch (Exception e) {
logger.error("getWatchedChildren error", e);
respBean = RespBean.error("getWatchedChildren failed, caused by " e.getMessage());
}
return respBean;
}
/**
* 删除节点接口
*/
@DeleteMapping("/deleteByPath")
public RespBean<String> deleteDataByPath(@RequestParam("path") String path){
logger.info("delete ZNode " path);
RespBean<String> respBean;
try {
zooKeeperService.deleteData(path);
respBean = RespBean.success("delete ZNode success");
} catch (Exception e) {
logger.error("delete ZNode of " path "failed", e);
respBean = RespBean.error("delete ZNode failed, caused by " e.getMessage());
}
return respBean;
}
/**
* 安全删除节点接口
*/
@DeleteMapping("/guaranteedDeleteData")
public RespBean<String> guaranteedDeleteData(@RequestParam("path") String path){
logger.info("guaranteed delete ZNode " path);
RespBean<String> respBean;
try {
zooKeeperService.guaranteedDeleteData(path);
respBean = RespBean.success("guaranteed delete data success");
} catch (Exception e) {
logger.error("guaranteed delete data failed", e);
respBean = RespBean.error("guaranteed delete data failed, caused by " e.getMessage());
}
return respBean;
}
/**
* 校验POST请求入参数据
*/
private void checkPostData(Map<String, Object> postData){
String path = (String) postData.get("path");
if(StringUtils.isEmpty(path)){
throw new IllegalArgumentException("path cannot be null");
}
Object data = postData.get("data");
if(data==null || "".equals(data)){
throw new IllegalArgumentException("data cannot be null");
}
}
}
测试CRUD基础操作
首先我们在项目的SpringSecurity
配置文件WebSecurityConfig.java
中对操作zookeeper的接口放开认证要求
SpringSecurity#configure(HttpSecurity http)
方法
http.authorizeRequests()
.antMatchers("/user/reg").anonymous()
.antMatchers("/zookeeper/**").anonymous()
参考笔者之前发布的文章Zookeeepr入门(一)启动ZooKeeper集群服务,然后启动本地的Redis和MySql服务后
再在IDEA中启动blogserver服务,服务启动成功之后就可以通过在postman中调用接口进行验证了
创建持久节点
在postman中调用创建持久节点接口
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/create/persistent
{
"path": "/test",
"data": {
"serviceName": "zooKeeperService",
"serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
"contextPath": "/zooKeeper",
"apiList": []
}
}
接口返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "create node /test success"
}
然后更换入参调用同一个接口创建子节点/test/oderService
POST http://localhost:8081/blog/zookeeper/create/persistent
{
"path": "/test/oderService",
"data": {
"serviceName": "orderService",
"serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
"contextPath": "/orderService",
"apiList": [
{
"apiId": 1,
"apiPath": "/order/create",
"apiName": "创建订单接口API",
"requestType": "POST",
"argumentTypes": "java.lang.String, java.lang.Double",
"returnType": "org.sang.pojo.RespBean"
},
{
"apiId": 2,
"apiPath": "/order/get",
"apiName": "查询订单接口API",
"requestType": "GET",
"argumentTypes": "java.lang.Long",
"returnType": "org.sang.pojo.RespBean"
}
]
}
}
注意:为了创建路径为/test/orderService的持久节点成功,必须先创建
/test节点,否则在没有父节点的情况下直接创建
/test`的子节点zookeeper客户端会会报错
此时我们在Linux客户端连接ZooKeeper
服务, 通过ls /test
命令可查看到/test
路径下的所有子节点
[zk: localhost:2181(CONNECTED) 19] ls /test
[orderService]
然后再通过get /test/orderService
命令可以直接查看该节点下的数据
[zk: localhost:2181(CONNECTED) 21] get /test/orderService
{"serviceName":"orderService","serverAddress":"192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080","contextPath":"/orderService","apiList":[{"apiId":1,"apiPath":"/order/create","apiName":"创建订单接口API","requestType":"POST","argumentTypes":"java.lang.String, java.lang.Double","returnType":"org.sang.pojo.RespBean"},{"apiId":2,"apiPath":"/order/get","apiName":"查询订单接口API","requestType":"GET","argumentTypes":"java.lang.Long","returnType":"org.sang.pojo.RespBean"}]}
- 查看节点数据
为了将/test
节点及其子节点中存放的数据在取数据时能反序列化为一个对象,我们新建了一两个实体类
ServiceInfo.java
和ApiInfo.java
public class ServiceInfo implements Serializable {
// 服务名称
private String serviceName;
// 服务地址,IP 端口号,多个使用逗号分隔
private String serverAddress;
// 上下文
private String contextPath;
// 服务中的api列表
private List<ApiInfo> apiList;
// 省略setter和getter方法
}
代码语言:javascript复制public class ApiInfo implements Serializable {
// APIID
private Long apiId;
// API 路径
private String apiPath;
// API名称
private String apiName;
// 请求类型:GET|POST|PUT|DELETE
private String requestType;
// 参数类型,参数全类名, 多个以逗号分隔
private String argumentTypes;
// 返回值类型
private String returnType;
// 省略setter和getter方法
}
在postman中调用查询节点数据接口
代码语言:javascript复制 POST http://localhost:8081/blog/zookeeper/getDataByPath
{
"path": "/test",
"fullClassName": "org.sang.pojo.ServiceInfo"
}
// 第一参数为节点路径,第二个参数为实体类全类名
接口返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": {
"serviceName": "zooKeeperService",
"serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
"contextPath": "/zooKeeper",
"apiList": []
}
}
再次调用相同接口获取/test/orderService
节点中的数据
POST http://localhost:8081/blog/zookeeper/getDataByPath
{
"path": "/test/orderService",
"fullClassName": "org.sang.pojo.ServiceInfo"
}
接口返回结果:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": {
"serviceName": "orderService",
"serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
"contextPath": "/orderService",
"apiList": [
{
"apiId": 1,
"apiPath": "/order/create",
"apiName": "创建订单接口API",
"requestType": "POST",
"argumentTypes": "java.lang.String, java.lang.Double",
"returnType": "org.sang.pojo.RespBean"
},
{
"apiId": 2,
"apiPath": "/order/get",
"apiName": "查询订单接口API",
"requestType": "GET",
"argumentTypes": "java.lang.Long",
"returnType": "org.sang.pojo.RespBean"
}
]
}
}
创建临时节点
调用创建临时节点接口
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/create/ephemeral
{
"path": "/test/ephemralOne",
"data": "ephemeral node1"
}
接口返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "create ephemeral node /test/ephemralOne success"
}
然后在连接ZooKeeper服务的Linux客户端中执行命令ls /test
查看临时节点
[zk: localhost:2181(CONNECTED) 22] ls /test
[ephemralOne, orderService]
我们发现/test
节点下多了一个子节点ephemralOne
然后重启blogserver服务后我们再次执行ls /test
命令会发现ephemralOne
子节点消失不见了,证明了它是 一个临时节点,在会话关闭后就会消失。
创建临时有序节点
调用创建临时有序节点接口
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/ephemeral/sequence
{
"path": "/test/ephemeralSequence",
"data": "ephemeralSequence"
}
连续调用以上接口三次
每次调用都会返回以下响应信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "create ephemeral sequence node /test/ephemeralSequence success"
}
再次执行ls /test
命令查看/test
节点下的子节点
[zk: localhost:2181(CONNECTED) 23] ls /test
[ephemeralSequence0000000004, ephemeralSequence0000000005, ephemeralSequence0000000006, orderService]
可以看到创建的临时有序节点在指定的路径名ephemeralSequence
后面都带有一个十位长度的数字字符串
在获取节点路径时需要将后面的数字字符也带上
代码语言:javascript复制 [zk: localhost:2181(CONNECTED) 24] get /test/ephemeralSequence0000000004
"ephemeralSequence"
重置节点数据
调用同步设置数据接口
代码语言:javascript复制POST
{
"path": "/test",
"data": {
"serverAddress": "localhost:2181,localhost:2182,localhost:2183",
"contextPath": "/cloudService",
"serviceName": "registerService",
"apiList":[]
}
}
返回结果:
代码语言:javascript复制 {
"status": 200,
"msg": "success",
"data": "set data success"
}
然后通过客户端zkCli
命令可以查看到test
节点数据发送了变化
[zk: localhost:2181(CONNECTED) 0] get /test
{"serverAddress":"localhost:2181,localhost:2182,localhost:2183","contextPath":"/cloudService","serviceName":"registerService","apiList":[]}
删除节点
调用删除节点接口
代码语言:javascript复制DELETE http://localhost:8081/blog/zookeeper/deleteByPath?path=/test/ephemralOne
返回结果
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "delete ZNode success"
}
注意DeleteBuilder#forPath
方法只能删除没有子节点的节点,不能用来删除有子节点的节点
另一种删除方式curatorFramework.delete().guaranteed().forPath(path)
也只能删除子节点,这种方式是显示地表示可删除节点位子节点
ZooKeeper中的回调、监听器和Watcher
ZooKeeper中间件之所以能作为一个分布式协调器的一个重要原因就在于它的Watch机制, 当节点创建、修改、删除以及重连和连接失效时都能通过watch机制得到通知
通过回调监听
这种方式是通过设置BackgroundCallback回调函数监听节点
BackgroundCallback
是一个接口
public interface BackgroundCallback {
void processResult(CuratorFramework zkClient, CuratorEvent event) throws Exception;
}
构造一个BackgroundCallback
实例需要实现processResult
抽象方法
第一个参数是CuratorFramework
类型的zkClient, 通过这个客户端可以在回调种继续操作ZNode节点, 添加监视器等
第二个参数是CuratorEvent
类型的事件对象, CuratorEvent
也是一个接口,它的实现类是CuratorEventImpl
public interface CuratorEvent {
CuratorEventType getType();
int getResultCode();
String getPath();
Object getContext();
Stat getStat();
byte[] getData();
String getName();
List<String> getChildren();
List<ACL> getACLList();
List<CuratorTransactionResult> getOpResults();
WatchedEvent getWatchedEvent();
}
通过CuratorEvent#getType
方法可以获得事件类型CuratorEventType
,是一个枚举类型可以看到有下面这些事件类型
public enum CuratorEventType {
CREATE, // 创建节点
DELETE, // 删除节点
EXISTS, // 存在节点
GET_DATA, // 获取节点数据
SET_DATA, // 重置节点数据
CHILDREN, // 子节点
SYNC, // 同步
GET_ACL, // 获取权限
SET_ACL, // 设置权限
TRANSACTION, // 事务
GET_CONFIG, // 获取zookeeper配置信息
RECONFIG, // 重新配置zookeeper
WATCHED, // 监听
REMOVE_WATCHES, // 移除监听
CLOSING, 会话关闭
ADD_WATCH; // 添加watch
private CuratorEventType() {
}
}
通过添加监听器监听
监听器为CuratorListener
, 它是一个接口,有一个抽象方法
public interface CuratorListener {
void eventReceived(CuratorFramework zkClient, CuratorEvent event) throws Exception;
}
可以看到CuratorListener
与BackgroundCallback
两个接口具有相同类型的入参,可以说二者实现监视节点的效果和底层原理都是一样的
给节点添加Watcher
watcher是一个接口
代码语言:javascript复制@Public
public interface Watcher {
void process(WatchedEvent watchedEvent);
}
要构造一个Watcher实例需要实现process
抽象方法,只有一个WatchedEvent
类型的构造参数
@Public
public class WatchedEvent {
private final KeeperState keeperState;
private final EventType eventType;
private String path;
// 三个参数的构造函数
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
}
// 一个参数的构造方法
public WatchedEvent(WatcherEvent eventMessage) {
this.keeperState = KeeperState.fromInt(eventMessage.getState());
this.eventType = EventType.fromInt(eventMessage.getType());
this.path = eventMessage.getPath();
}
public KeeperState getState() {
return this.keeperState;
}
public EventType getType() {
return this.eventType;
}
public String getPath() {
return this.path;
}
public String toString() {
return "WatchedEvent state:" this.keeperState " type:" this.eventType " path:" this.path;
}
public WatcherEvent getWrapper() {
return new WatcherEvent(this.eventType.getIntValue(), this.keeperState.getIntValue(), this.path);
}
}
通过WatchedEvent#type
方法可以获得事件类型参数EventType
对象
EventType
也是一个枚举类, 囊括了以下几种ZNode事件
None(-1), // 无事件
NodeCreated(1), // 创建节点
NodeDeleted(2), // 节点被删除
NodeDataChanged(3), // 节点发送改变
NodeChildrenChanged(4), // 子节点发生改变
DataWatchRemoved(5), // 数据监视器被删除
ChildWatchRemoved(6), // 子节点监视器被删除
PersistentWatchRemoved(7); // 持久化监视器被删除
Watcher
内部具有EventType
和KeeperState
两个枚举类
BackgroundCallback与CuratorListener的用法
在ZooKeeeprService
类中定义一个全局变量callback
, 这个回调可以在异步创建节点、异步修改节点数据以及异步删除节点时对节点进行监听处理
// 定义回调函数,对节点进行监听
private BackgroundCallback callback = ((zkClient, curatorEvent) -> {
logger.info("event data=" ((curatorEvent.getData()==null)?"null data": new String(curatorEvent.getData())));
switch (curatorEvent.getType()){
case SET_DATA:
// 只会触发SET_DATA事件
logger.info("node data changed");
// 回调做其他事情
break;
case CREATE:
logger.info("node created");
break;
case CHILDREN:
logger.info("children");
break;
case DELETE:
logger.info("node deleted");
break;
default:
logger.info("eventType=" curatorEvent.getName());
break;
}
});
// 定义监听器,对节点进行监听
private CuratorListener listener = (client, event) -> {
Stat stat = event.getStat();
logger.info("stat=" JSON.toJSONString(stat));
CuratorEventType eventType = event.getType();
logger.info("eventType=" eventType.name());
};
这里我们只在回调里打印日志
然后在异步创建节点、异步修改节点数据及异步删除节点方法中使用
代码语言:javascript复制 /**
* 异步创建节点
* @param path
* @param data
* @throws Exception
*/
public void asyncCreateNode(String path, String data) throws Exception {
curatorFramework.create().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 异步修改节点数据
* @param path
* @param data
* @throws Exception
*/
public void setDataAsyncWithCallback(String path, String data) throws Exception {
curatorFramework.setData().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 使用监听器异步修改数据
* @param path
* @param data
* @throws Exception
*/
public void setDataAsyncWithListener(String path, String data) throws Exception {
// 通过监听器修节点改数据
curatorFramework.getCuratorListenable().addListener(listener);
curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 异步删除数据
* @param path
* @throws Exception
*/
public void asyncDeleteData(String path) throws Exception {
curatorFramework.delete().inBackground(callback).forPath(path);
}
然后在ZooKeeperController
类中加上对应的控制器方法
@PostMapping("/create/async")
public RespBean<String> asyncCreateNode(@RequestBody Map<String, Object> postData){
RespBean respBean;
try {
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.asyncCreateNode(path, JSON.toJSONString(data));
respBean = RespBean.success(" async create node " path " success");
} catch (Exception e){
respBean = RespBean.error("async create node failed");
logger.error("async create node error", e);
}
return respBean;
}
@PostMapping("/setData/async/callback")
public RespBean<String> setDataAsyncWithCallback(@RequestBody Map<String, Object> paramMap){
checkPostData(paramMap);
RespBean<String> respBean;
try {
zooKeeperService.setDataAsyncWithCallback((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
respBean = RespBean.success("async set data with callback success");
} catch (Exception e) {
logger.error("set data with callback failed", e);
respBean = RespBean.error("async set data with callback failed, caused by " e.getMessage());
}
return respBean;
}
@DeleteMapping("/deleteByPath/async")
public RespBean<String> asyncDeleteByPath(@RequestParam("path") String path){
logger.info(" async delete ZNode " path);
RespBean<String> respBean;
try {
zooKeeperService.asyncDeleteData(path);
respBean = RespBean.success("delete ZNode success");
} catch (Exception e) {
logger.error("async delete ZNode of " path "failed", e);
respBean = RespBean.error("async delete ZNode failed, caused by " e.getMessage());
}
return respBean;
}
然后重启服务开始测试效果
测试异步添加节点
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/create/async
{
"path": "/test/person",
"data": "personInfo collection"
}
返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": " async create node /test/person success"
}
可以看到控制台中打印出了节点创建的信息,这是我们在回调函数中根据事件类型判断打印的日志
代码语言:javascript复制2022-08-21 13:44:02.629 INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService : event data=null data
2022-08-21 13:44:02.630 INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService : node created
测试使用回调异步修改节点数据接口
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/setData/async/callback
{
"path": "/test/person",
"data": "person infomation collection"
}
返回结果:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "async set data with callback success"
}
控制台打印如下日志:
代码语言:javascript复制2022-08-21 22:55:02.011 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : event data=null data
2022-08-21 22:55:02.012 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : node data changed
测试使用监听器异步修改节点数据
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/setData/async/listener
{
"path": "/test/person/2",
"data": {
"name": "李四",
"age": 24,
"height": 173.2,
"salary": 15800.85
}
}
返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "async set data with callback success"
}
控制台打印日志:
代码语言:javascript复制2022-08-21 23:03:54.778 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : stat=null
2022-08-21 23:03:54.779 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : eventType=SET_DATA
测试异步删除节点
代码语言:javascript复制DELETE http://localhost:8081/blog/zookeeper/deleteByPath/async?path=/test/person/2
返回结果:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "delete ZNode success"
}
控制台打印出日志:
代码语言:javascript复制2022-08-21 23:09:06.306 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : event data=null data2022-08-21 23:09:06.306 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : node deleted
Watcher的用法
在ZooKeeperService
类里定义个全局的CuratorWatcher
, 并定义一个为节点添加这个watcher
的方法
private CuratorWatcher watcher = watchedEvent -> {
String eventName = watchedEvent.getType().name();
// 监听的节点路径
String watchedPath = watchedEvent.getPath();
logger.info("watchedPath={}", watchedPath);
switch (eventName){
case "NodeCreated":
logger.info("node created, add Lock success");
break;
case "NodeDeleted":
logger.info("node deleted, release lock success");
break;
case "NodeDataChanged":
logger.info("node data changed");
break;
case "NodeChildrenChanged":
logger.info("node children changed");
break;
case "DataWatchRemoved":
logger.info("data watcher removed");
break;
case "ChildWatchRemoved":
logger.info("child watcher removed");
break;
case "PersistentWatchRemoved":
logger.info("persistent watcher removed");
/**
* 给节点添加watcher
* @param path
* @throws Exception
*/
public void addWatchByPath(String path) throws Exception {
curatorFramework.getData().usingWatcher(watcher).forPath(path);
}break;
default:
logger.info("none event");
break;
}
};
/**
* 给节点添加watcher
* @param path
* @throws Exception
*/
public void addWatchByPath(String path) throws Exception {
curatorFramework.getData().usingWatcher(watcher).forPath(path);
}
然后再在ZooKeeperController
类中添加对应的控制器方法
@PostMapping("/addWatcherByPath")
public RespBean<String> addWatcherByPath(@RequestParam("path") String path){
RespBean<String> respBean;
try {
zooKeeperService.addWatchByPath(path);
respBean = RespBean.success("add watcher success");
} catch (Exception e) {
logger.info("add watcher failed", e);
respBean = RespBean.error("add watcher failed, caused by " e.getMessage());
}
return respBean;
}
重启应用后对/test/person/1
这个节点进行监听
POST http://localhost:8081/blog/zookeeper/addWatcherByPath?path=/test/person/1
返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "add watcher success"
}
然后调用同步setData接口对这个节点数据进行修改
代码语言:javascript复制POST http://localhost:8081/blog/zookeeper/setData/sync
{
"path": "/test/person/1",
"data": {
"name": "王五",
"age": 32,
"height": 173.6,
"salary": 16800.58
}
}
返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "set data success"
}
控制台打印出日志:
代码语言:javascript复制2022-08-21 23:29:19.349 INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService : watchedPath=/test/person/1
2022-08-21 23:29:19.349 INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService : node data changed
注意: Watcher还会监听一次,后面继续对节点进行操作就不会进入CuratorWatcher#process
方法,如果需要继续监视节点的变化,则需要重新对节点添加Watcher
我们来测试一下效果,继续修改/test/person/1
节点
POST http://localhost:8081/blog/zookeeper/setData/sync
{
"path": "/test/person/1",
"data": {
"name": "赵六",
"age": 33,
"height": 174.6,
"salary": 18800.58
}
}
返回信息:
代码语言:javascript复制{
"status": 200,
"msg": "success",
"data": "set data success"
}
但是控制台却看不到我们定义的watcher
的process
方法中打印的日志,说明被调用了一次的watcher
已经失效
不过要给节点添加持久类型的Watcher可通过下面这种链式调用方式实现
代码语言:javascript复制CuratorFramework.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT)
.usingWatcher(watcher)
.forPath(path)
小结
本文主要详细讲解了使用CuratorFramework
客户端在SpringBoot项目中对ZooKeeper节点实现增删改查以及对ZooKeeper节点添加BackgroundCallback
回调、CuratorListener
监听器和Watcher
监视器等操作,既能实现对节点的异步操作,也能监听节点的变化。从而让我们并根据ZooKeeper节点事件类型作出响应的业务逻辑处理.
关于使用CuratorFramework
客户端以非事务的方式操作ZooKeeper节点就介绍到这里,想要更深入的学习CuratorFramework
的用法可通过阅读该类及其方法相关类的源码进一步掌握。下一篇文章,笔者将继续介绍
使用CuratorFramework
客户端在一个事务中完成多个操作,并介绍使用ZooKeeper实现分布式事务锁。