1. 数据存储
事务日志 快照日志 运行时日志 bin/zookeeper.out
2 基于 Java API 初探 zookeeper 的使用
2.1 zookeeper 增删改查
代码语言:javascript复制import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author: xiepanpan
* @Date: 2020/5/30
* @Description: zookeeper 连接
*/
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183", 4000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//如果收到了服务端的响应事件,连接成功
if (Event.KeeperState.SyncConnected==watchedEvent.getState()) {
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//CONNECTED
System.out.println(zooKeeper.getState());
//创建节点 第三个参数是权限 OPEN_ACL_UNSAFE 所有人都可以访问 第四个参数 节点类型 持久节点
zooKeeper.create("/zk-persistent-xpp","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
//得到节点
Stat stat = new Stat();
byte[] bytes = zooKeeper.getData("/zk-persistent-xpp", null, stat);
System.out.println(new String(bytes));
//修改节点
zooKeeper.setData("/zk-persistent-xpp","1".getBytes(),stat.getVersion());
byte[] data = zooKeeper.getData("/zk-persistent-xpp", null, stat);
System.out.println(new String(data));
//删除节点 使用乐观锁 比较版本号
zooKeeper.delete("/zk-persistent-xpp",stat.getVersion());
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.2 事件机制
Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听 事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper 实现分布式锁、集群管理等功能 watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端 只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(watcher 是一次性的操作)。 可以通过循环监听去达到永久监听效果
2.3 如何注册事件机制
通过这三个操作来绑定事件 :getData、Exists、getChildren
2.4 如何触发事件?
凡是事务类型的操作,都会触发监听事件。 create /delete /setData
2.5 watcher 事件类型
None (-1), 客户端链接状态发生变化的时候,会收到 none 的事件 NodeCreated (1), 创建节点的事件。 比如 zk-persis-mic NodeDeleted (2), 删除节点的事件 NodeDataChanged (3), 节点数据发生变更 NodeChildrenChanged (4); 子节点被创建、被删除、会发生事件触发
2.6 操作对应的water事件类型
zk-persis-mic ( 监听事件) | zk-persis-mic/child (子节点监听事件) | |
---|---|---|
create(/ zk-persis-mic) | NodeCreated (exists getData) | 无 |
delete(/ zk-persis-mic) | NodeDeleted (exists getData) | 无 |
setData(/ zk-persis-mic) | NodeDataChanged (exists getData) | |
create(/ zk-persis-mic/child) | NodeChildrenChange(getChildren) | NodedCreated |
delete(/ zk-persis-mic/child) | NodeChildrenChange(getChildren) | NodedDeleted |
setData(/ zk-persis-mic/child) | NodeDataChanged |
3 Curator 客户端的使用,简单高效
3.1使用Curator对zookeeper增删改查
代码语言:javascript复制import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* @author: xiepanpan
* @Date: 2020/5/31
* @Description: 使用Curator框架来对zookeeper操作
*/
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
//衰减的重试机制
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183")
.sessionTimeoutMs(4000)
//隔离命名空间 以下所有操作都是基于该相对目录进行的
.namespace("curator")
.build();
curatorFramework.start();
//创建节点
//结果 /curator/xpp/mode1
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/xpp/node1","1".getBytes());
//更改节点
//保存节点状态
Stat stat = new Stat();
curatorFramework.getData().storingStatIn(stat).forPath("/xpp/node1");
curatorFramework.setData().withVersion(stat.getVersion()).forPath("/xpp/node1","xx".getBytes());
//删除节点
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/xpp/node1");
}
}
3.2 监听
PathChildrenCache 监听一个节点下的子节点的创建删除和更新
NodeCache监听一个节点的更新和创建事件
TreeCache 综合PathChildrenCache 和NodeCache 的特性
代码语言:javascript复制import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @author: xiepanpan
* @Date: 2020/5/31
* @Description: 事件监听
*/
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
//衰减的重试机制
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183")
.sessionTimeoutMs(4000)
//隔离命名空间 以下所有操作都是基于该相对目录进行的
.namespace("curator")
.build();
curatorFramework.start();
// addListenerWithNodeCache(curatorFramework,"/xpp");
// addListenerWithPathChildCache(curatorFramework,"/xpp");
addListenerWithTreeCache(curatorFramework,"/xpp");
System.in.read();
}
/**
* 监听一个节点的更新和创建事件
* @param curatorFramework
* @param path
* @throws Exception
*/
public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
//第三个参数 对详细内容数据的压缩
final NodeCache nodeCache = new NodeCache(curatorFramework,path,false);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Receive Event:" nodeCache.getCurrentData().getPath());
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
/**
* 监听一个节点子节点的创建删除和更新
* @param curatorFramework
* @param path
* @throws Exception
*/
public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("Receive Event:" pathChildrenCacheEvent.getType());
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
}
/**
* 综合节点监听事件 监听当前节点和子节点 节点上任何一个事件都能收到
* @param curatorFramework
* @param path
* @throws Exception
*/
public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
TreeCache treeCache = new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent.getType() "->" treeCacheEvent.getData().getPath());
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
}
}
我的pom文件
代码语言:javascript复制<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.4-beta</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
zookeeper 我是用的3.4.14 版本 注意 Curator 版本 和zookeeper版本有对应关系 我的Curator版本是2.7.1
https://blog.csdn.net/glory1234work2115/article/details/51967507