Zookeeper学习笔记(一)SpringBoot的操作zk

2022-08-11 15:33:13 浏览数 (1)

我们都知道dubbo采用zookeeper做注册中心,那么我们springboot如何去操作zookeeper?为什么爱用springboot的原因是我们可能潜在的需要使用zookeeper做一些高可用方面的事情。

1.zookeeper单机的部署。

首先需要去官网下载zookeeper,注意我们一定要下载名称中带有bin的包,

下载完毕之后,我们需要将其解压到相应的文件夹下。其中bin是一些启动脚本,conf就是我们配置文件。dir是作者自己创建的数据缓存文件docs是zk的一些说明文档吧,lib是zk依赖的一些其他包。

这块要注意,经量不要用WinRAR解压,作者发现这个软件解压不出来。但并不是包的问题,是解压工作的问题。

这里推荐大家用Bandizip作为电脑的解压工作,零广告哦!

我们需要做的就是修改配置文件。默认的情况下有一个zoo_example.cof的配置文件。我们需将其重命名为zoo.conf,在此我们看一下zk配置文件都有哪些配置项。

代码语言:javascript复制
# The number of milliseconds of each tick
#每次间隔的时间差
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
#可以允许的次数
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
#快照数据缓存地址
dataDir=D:\zookeeper\apache-zookeeper-3.5.9-bin\dir
# the port at which the clients will connect
#客户端连接端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

修改好上述配置之后,我们来启动zk。在bin目录中找到,并启动

启动服务端之后,我们启动zk的客户端

这里我们通过zk的命令去操作zkserver,zk的主要命令有set,get,create,delete,update等。

我们将服务启动之后创建我们的springboot项目。

需要依赖的maven配置为

代码语言:javascript复制
  <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
  <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.6.2</version>
  </dependency>

将zookeeper注入到springboot容器中。

代码语言:javascript复制
@Configuration
public class ZkConfig {

    @Bean
    public ZooKeeper create() throws IOException {
        System.out.println("prepare...");
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 3000,myZkWatcher() );
        System.out.println("complate connect..");
        return zk;
    }

    @Bean
    public MyZkWatcher myZkWatcher(){
        //创建一个watcher
        return new MyZkWatcher("123");
    }
}

这里的MyZkWatcher代码为

代码语言:javascript复制
public class MyZkWatcher implements Watcher {

    private CountDownLatch countDownLatch;

    private String tianjingle;

    public MyZkWatcher(String info){
        this.tianjingle=info;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("water 被触发了");
    }
}

相关业务逻辑

代码语言:javascript复制
@RestController
@RequestMapping(value = "/zk")
public class TestZk {

    @Autowired
    private ZooKeeper zk;


    @Autowired
    private MyZkWatcher myZkWatcher;


    @GetMapping(value = "/test")
    public void add() throws KeeperException, InterruptedException {
//        create(zk, "/tianjingle", "tianjl");
        queryData(zk, "/tianjingle");

       Stat st=update(zk, "/tianjingle", "zhangsan");
       if (null!=st){
           //更新之后创建一个watcher,因为watcher只能使用一次。   
           zk.exists("/tianjingle",myZkWatcher);
       }
//        delete(zk, "/tianjingle");
    }

      void create(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{
        System.out.println(MessageFormat.format("开始创建节点:{0}, 数据:{1}",nodePath,nodeData));
        List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
        CreateMode createMode = CreateMode.PERSISTENT;
        String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode);
        System.out.println(MessageFormat.format("创建节点返回结果:{0}",result));
        System.out.println(MessageFormat.format("完成创建节点:{0}, 数据:{1}",nodePath,nodeData));
    }

    public  Stat queryStat(ZooKeeper zk, String nodePath) throws KeeperException, InterruptedException{
        System.out.println(MessageFormat.format("准备查询节点Stat,path:{0}", nodePath));
        Stat stat = zk.exists(nodePath, false);
        System.out.println(MessageFormat.format("结束查询节点Stat,path:{0},version:{1}", nodePath, stat.getVersion()));
        return stat;
    }

    public  String queryData(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{
        System.out.println(MessageFormat.format("准备查询节点Data,path:{0}", nodePath));
        String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath)));
        System.out.println(MessageFormat.format("结束查询节点Data,path:{0},Data:{1}", nodePath, data));
        return data;
    }


    public  Stat update(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{
        Stat stat = queryStat(zk, nodePath);
        System.out.println(MessageFormat.format("准备修改节点,path:{0},data:{1},原version:{2}", nodePath, nodeData, stat.getVersion()));
        Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion());
        //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
        //zk.setData(path, data, version, cb, ctx);
        System.out.println(MessageFormat.format("完成修改节点,path:{0},data:{1},现version:{2}", nodePath, nodeData, newStat.getVersion()));
        return stat;
    }


    public void delete(ZooKeeper zk,String nodePath) throws InterruptedException, KeeperException{
        //删除节点前先查询该节点信息
        Stat stat = queryStat(zk, nodePath);
        System.out.println(MessageFormat.format("准备删除节点,path:{0},原version:{1}", nodePath, stat.getVersion()));
        zk.delete(nodePath, stat.getVersion());
        //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
        //zk.delete(path, version, cb, ctx);
        System.out.println(MessageFormat.format("完成删除节点,path:{0}", nodePath));
    }

}

发送请求打印的结果

在zkclient中修改/tianjingle的value,发现springboot的watcher被触发了。

0 人点赞