代码语言:javascript复制
package com.redis.demo.zookeeper;
import java.io.Serializable;
public class User implements Serializable {
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
代码语言:javascript复制package com.redis.demo.zookeeper;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.UnsupportedEncodingException;
public class MyZkSerializer implements ZkSerializer {
String charset = "UTF-8";
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
// return new byte[0];
try {
return String.valueOf(o).getBytes(charset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError();
}
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes,charset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError();
}
}
}
代码语言:javascript复制package com.redis.demo.zookeeper;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ZooUtil {
public static void main(String[] args) {
List<String> ll = new ArrayList<String>();
ZkClient zkClient = new ZkClient("124.220.163.230:2181", 30000, 30000);//new SerializableSerializer()
//java.lang.String cannot be cast to com.redis.demo.zookeeper.User
//跟编码类型有关系
// zkClient.setZkSerializer(new MyZkSerializer());
zkClient.setZkSerializer(new SerializableSerializer());
/**
* 获取子目录下的数据
*/
List<String> l = zkClient.getChildren("/");
for (String s : l) {
System.out.println("获取子目录下的数据=" s);
}
/**
* 创建节点,无需重复创建
*/
// zkClient.createPersistent("/testJava1");
/**
* 创建子节点
*/
// zkClient.createPersistent("/testJava1/test", true);
/**
* 不允许重复创建
* org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /testJava1
*
*/
/**
* 创建并设置节点的值
*/
// zkClient.createPersistent("/testJava1", "aaaa");
/**
* 写数据,即更新数据,会update,不会append
*/
// zkClient.writeData("/testJava1", "hello");
/**
* 写一个对象,要序列化
* User implements Serializable
*/
// User user = new User();
// user.setId(1);
// user.setName("liudehua");
// zkClient.create("/testJava3", user, CreateMode.PERSISTENT);
/**
* 删除节点
*/
// zkClient.delete("/testJava1");
/**
* 递归删除节点和其子节点
*/
// zkClient.deleteRecursive("/testJava1");
/**
* 读取数据对象
*
* 测试类打印输出:
* 获取子目录下的数据=testJava2
* id=1
* name=liudehua
*/
Stat stat = new Stat();
User u = zkClient.readData("/testJava3", stat);
System.out.println("id=" u.getId());
System.out.println("name=" u.getName());
/**
* 读取简单类型数据
*/
// String s = zkClient.readData("/testJava1");
// System.out.println("读取简单类型数据=" s);
/**
* 判断节点是否存在
*/
boolean b = zkClient.exists("/testJava1");
System.out.println("判断节点是否存在=" b);
/**
* 监听节点的变化,节点增加,删除,减少
*/
zkClient.subscribeChildChanges("/testJava1", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath = " parentPath);
}
});
/**
* 监听节点数据的变化,子节点数据变化不会监听到
*/
zkClient.subscribeDataChanges("/testJava1", new IZkDataListener() {
//数据变化时触发
/**
* 模拟打印,需要间隔时间,在cli客户端没有触发??
* handleDataChange path = /testJava1,data = one
* handleDataChange path = /testJava1,data = two
* handleDataChange path = /testJava1,data = three
*
* @param dataPath
* @param data
* @throws Exception
*/
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("handleDataChange path = " dataPath ",data = " data);
}
/**
* 测试打印输出
* parentPath = /testJava1
* handleDataDeleted path = /testJava1
*
* @param dataPath
* @throws Exception
*/
//节点删除时触发
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("handleDataDeleted path = " dataPath);
}
});
zkClient.writeData("/testJava1", "one");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkClient.writeData("/testJava1", "two");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkClient.writeData("/testJava1", "three");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//KeeperErrorCode = Directory not empty for /testJava1
// zkClient.delete("/testJava1");
//需要递归删除
zkClient.deleteRecursive("/testJava1");
//测试监听的变化,通过线程来延时。
try {
Thread.sleep(1000*60*15);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试发现:在java api操作变更,能监听到变化。但是在ZkClient操作,idea控制台没有反应!! watch没有变化,在ZkClient操作。