zookeeper javaApi 事件监听

2022-10-25 16:11:28 浏览数 (1)

1. 数据存储

事务日志 快照日志 运行时日志 bin/zookeeper.out

2 基于 Java API 初探 zookeeper 的使用

2.1 zookeeper 增删改查

代码语言:javascript复制
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author: xiepanpan
 * @Date: 2020/5/30
 * @Description: zookeeper 连接
 */
public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper =
                    new ZooKeeper("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183", 4000, new Watcher() {
                        public void process(WatchedEvent watchedEvent) {
                            //如果收到了服务端的响应事件,连接成功
                            if (Event.KeeperState.SyncConnected==watchedEvent.getState()) {
                                countDownLatch.countDown();
                            }
                        }
                    });
            countDownLatch.await();
            //CONNECTED
            System.out.println(zooKeeper.getState());

            //创建节点 第三个参数是权限 OPEN_ACL_UNSAFE 所有人都可以访问 第四个参数 节点类型 持久节点
            zooKeeper.create("/zk-persistent-xpp","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Thread.sleep(1000);

            //得到节点
            Stat stat = new Stat();
            byte[] bytes = zooKeeper.getData("/zk-persistent-xpp", null, stat);
            System.out.println(new String(bytes));

            //修改节点
            zooKeeper.setData("/zk-persistent-xpp","1".getBytes(),stat.getVersion());

            byte[] data = zooKeeper.getData("/zk-persistent-xpp", null, stat);
            System.out.println(new String(data));

            //删除节点 使用乐观锁 比较版本号
            zooKeeper.delete("/zk-persistent-xpp",stat.getVersion());

            zooKeeper.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.2 事件机制

Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听 事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper 实现分布式锁、集群管理等功能 watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端 只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(watcher 是一次性的操作)。 可以通过循环监听去达到永久监听效果

2.3 如何注册事件机制

通过这三个操作来绑定事件 :getData、Exists、getChildren

2.4 如何触发事件?

凡是事务类型的操作,都会触发监听事件。 create /delete /setData

2.5 watcher 事件类型

None (-1), 客户端链接状态发生变化的时候,会收到 none 的事件 NodeCreated (1), 创建节点的事件。 比如 zk-persis-mic NodeDeleted (2), 删除节点的事件 NodeDataChanged (3), 节点数据发生变更 NodeChildrenChanged (4); 子节点被创建、被删除、会发生事件触发

2.6 操作对应的water事件类型

zk-persis-mic ( 监听事件)

zk-persis-mic/child (子节点监听事件)

create(/ zk-persis-mic)

NodeCreated (exists getData)

delete(/ zk-persis-mic)

NodeDeleted (exists getData)

setData(/ zk-persis-mic)

NodeDataChanged (exists getData)

create(/ zk-persis-mic/child)

NodeChildrenChange(getChildren)

NodedCreated

delete(/ zk-persis-mic/child)

NodeChildrenChange(getChildren)

NodedDeleted

setData(/ zk-persis-mic/child)

NodeDataChanged

3 Curator 客户端的使用,简单高效

3.1使用Curator对zookeeper增删改查

代码语言:javascript复制
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
 * @author: xiepanpan
 * @Date: 2020/5/31
 * @Description:  使用Curator框架来对zookeeper操作
 */
public class CuratorDemo {
    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                //衰减的重试机制
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183")
                .sessionTimeoutMs(4000)
                //隔离命名空间  以下所有操作都是基于该相对目录进行的
                .namespace("curator")
                .build();
        curatorFramework.start();

        //创建节点
        //结果 /curator/xpp/mode1
        curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/xpp/node1","1".getBytes());

        //更改节点
        //保存节点状态
        Stat stat = new Stat();
        curatorFramework.getData().storingStatIn(stat).forPath("/xpp/node1");
        curatorFramework.setData().withVersion(stat.getVersion()).forPath("/xpp/node1","xx".getBytes());

        //删除节点
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/xpp/node1");
    }



}

3.2 监听

PathChildrenCache 监听一个节点下的子节点的创建删除和更新

NodeCache监听一个节点的更新和创建事件

TreeCache 综合PathChildrenCache 和NodeCache 的特性

代码语言:javascript复制
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author: xiepanpan
 * @Date: 2020/5/31
 * @Description:  事件监听
 */
public class CuratorWatcherDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                //衰减的重试机制
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183")
                .sessionTimeoutMs(4000)
                //隔离命名空间  以下所有操作都是基于该相对目录进行的
                .namespace("curator")
                .build();
        curatorFramework.start();

//        addListenerWithNodeCache(curatorFramework,"/xpp");
//        addListenerWithPathChildCache(curatorFramework,"/xpp");
        addListenerWithTreeCache(curatorFramework,"/xpp");
        System.in.read();
    }

    /**
     * 监听一个节点的更新和创建事件
     * @param curatorFramework
     * @param path
     * @throws Exception
     */
    public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
        //第三个参数 对详细内容数据的压缩
        final NodeCache nodeCache = new NodeCache(curatorFramework,path,false);
        NodeCacheListener nodeCacheListener = new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("Receive Event:" nodeCache.getCurrentData().getPath());
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }

    /**
     *  监听一个节点子节点的创建删除和更新
     * @param curatorFramework
     * @param path
     * @throws Exception
     */
    public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("Receive Event:" pathChildrenCacheEvent.getType());
            }
        };
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
    }

    /**
     * 综合节点监听事件 监听当前节点和子节点  节点上任何一个事件都能收到
     * @param curatorFramework
     * @param path
     * @throws Exception
     */
    public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
        TreeCache treeCache = new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListener = new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println(treeCacheEvent.getType() "->" treeCacheEvent.getData().getPath());
            }
        };
        treeCache.getListenable().addListener(treeCacheListener);
        treeCache.start();
    }
}

我的pom文件

代码语言:javascript复制
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.5.4-beta</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-framework</artifactId>
           <version>2.7.1</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
           <version>2.7.1</version>
       </dependency>

zookeeper 我是用的3.4.14 版本 注意 Curator 版本 和zookeeper版本有对应关系 我的Curator版本是2.7.1

https://blog.csdn.net/glory1234work2115/article/details/51967507

0 人点赞