Curator使用手册

2021-12-22 16:21:56 浏览数 (1)

找了一圈没找到官方的使用手册- -!,自己瞎练练了

常用的zookeeper java客户端:
  • zookeeper原生Java API
  • zkclient
  • Apache curator
ZooKeeper原生Java API的不足之处:
  • 在连接zk超时的时候,不支持自动重连,需要手动操作
  • Watch注册一次就会失效,需要反复注册
  • 不支持递归创建节点
Apache curator介绍:

Apache 的开源项目

  • 解决Watch注册一次就会失效的问题
  • 支持直接创建多级结点
  • 提供的 API 更加简单易用
  • 提供更多解决方案并且实现简单,例如:分布式锁
  • 提供常用的ZooKeeper工具类
  • 编程风格更舒服

pom

代码语言:javascript复制
 <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
            <type>pom</type>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>
    </dependencies>

一. 关于Curator的普通增删改查API操作

一个全局的 curator private static CuratorFramework curatorFramework;

创建连接

代码语言:javascript复制
 private static void createZkCuratorConnection(String ipPort) {
        curatorFramework=CuratorFrameworkFactory
                        .builder() // 使用工厂类来建造客户端的实例对象
                        .connectString(ipPort) // 配置zk服务器IP port
                        .sessionTimeoutMs(4000)// 设定会话时间
                        .retryPolicy(new ExponentialBackoffRetry(1000,3))//设置及重连策略
                        .namespace("curator")//方便管理的命名空间,其实就是一级目录
                        .build();//建立管道
        curatorFramework.start();//开启curator

    }

关闭连接

代码语言:javascript复制
  private static void closeZkCuratorConnection() {
     curatorFramework.close();
    }
创建结点

这里就体现了用Cutaotr的好处了,不需要先创建父才创建子,可以设置检测需要父时候却没有的请看下自动创建

代码语言:javascript复制
    private static void createZnode(String path, String value) throws Exception {
        curatorFramework
                .create()//创建Znode
                .creatingParentsIfNeeded()//如果是多级结点,这里声明如果需要,自动创建父节点
                .withMode(CreateMode.PERSISTENT)//声明结点类型
                .forPath(path,value.getBytes());//声明结点路径和值
    }
删除结点

同样是cutator的优点,如果有子节点,会先自动删除子节点再删除本结点,可自动递归删除

代码语言:javascript复制
private static void deleteZnode(String path) throws Exception {
        curatorFramework
        .delete()
        .deletingChildrenIfNeeded()//如果有子节点,会先自动删除子节点再删除本结点
        .forPath(path);
    }

查询结点值

代码语言:javascript复制
    private static void getZnodeData(String path) throws Exception {
        byte[] dataBytes = curatorFramework.getData().forPath(path);
            System.out.println("结点值为:"  new String(dataBytes));
    }

当然也可以顺便把结点状态存储下来.

代码语言:javascript复制
private static void getZnodeData(String path) throws Exception {
        Stat stat=new Stat();
        byte[] dataBytes = curatorFramework.getData().storingStatIn(stat).forPath(path);
            System.out.println("结点值为:"  new String(dataBytes));
            System.out.print("目前版本为:" stat.getVersion() "创建时间为:" stat.getCtime());
    }
设置新值
代码语言:javascript复制
 private static void setValue(String path,String value) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
            if (stat==null){
                System.out.println("Znode does not exists");
            }else {
                curatorFramework
                        .setData()
                        .withVersion(stat.getVersion())
                        .forPath(path,value.getBytes());
            }
    }

重新调用查询方法看看新值

二 . Curator的事件机制

Cutator提供了三种完善而又灵活的监听机制
  • PathchildCache ~监听一个节点下子节点的创建、删除、更新
  • NodeCache ~监听一个节点的更新和创建事件(不包括删除)
  • TreeCache ~综合PatchChildCache和INodeCache的特性

pom

代码语言:javascript复制
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version>
        </dependency>
添加事件监听机制
代码语言:javascript复制
  private static void addWatcherWithNodeCache(String path) throws Exception {
        final NodeCache nodeCache=new NodeCache(curatorFramework,path,false);
        NodeCacheListener nodeCacheListener=new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("事件路径:" nodeCache.getCurrentData().getPath() "发生数据变化,新数据为" new String(nodeCache.getCurrentData().getData()));
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }

xshell测试

控制台进行监听和打印(删除结点是没有被监听的,关于孩子结点的变化也是没有被监听的)

ps:这里别忘了我们是要监听结点变化,可不能一启动就停了,可以再工作线程重加入System.in.read();使线程持续运行.

结点的孩子结点变化
代码语言:javascript复制
 private static void addWatcherWithChildrenCache(String path) throws Exception {
        final PathChildrenCache childrenCache=new PathChildrenCache(curatorFramework,path,true);//缓存数据
        PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("事件路径:" pathChildrenCacheEvent.getData().getPath() "事件类型" pathChildrenCacheEvent.getType());
            }
        };
        childrenCache.getListenable().addListener(pathChildrenCacheListener);
        childrenCache.start(PathChildrenCache.StartMode.NORMAL);
    }

Xshell测试

结果

关于pathChildrenCache启动过程有三种策略

  • NORMAL
  • BUILD_INITIAL_CACHE
  • POST_INITIALIZED_EVENT

最后一个TreeNode测试

代码语言:javascript复制
    private static void addWatcherWithTreeCache(String path) throws Exception {
        TreeCache treeCache=new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListener=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("事件路径:" treeCacheEvent.getData().getPath() "事件类型" treeCacheEvent.getType() "结点值为" new String(treeCacheEvent.getData().getData()));

            }
        };
        treeCache.getListenable().addListener(treeCacheListener);
        treeCache.start();
    }

0 人点赞