zookeeper

2022-10-25 16:01:18 浏览数 (1)

Zookeeper

  • 一、概述
  • 二、安装配置
  • 三、内部原理
  • 四、实例
    • 1. 命令行操作
    • 2. API应用
  • 五、问题

一、概述

文件系统 通知机制

  1. 介绍 Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。 Zookeeper=文件系统 通知机制。基于观察者模式的分布式服务管理框架,负责存储和管理共同数据,接受观察者的注册,数据变化会通知注册的观察者。
  2. 特点 (1)一个Leader,多个Follower组成的集群; (2)集群中只要有半数以上节点存活,ZK集群就能正常服务; (3)全局数据一致:每个Server保存同一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的; (4)更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行; (5)数据更新原子性,事物特点; (6)实时性,在一定时间范围内,Client能读到最新数据。
  3. 数据结构 ZK数据模型的结构与Unix文件系统类似,整体上可看成一棵树(从/开始),每个节点称作ZNode,每个ZNode默认存储1MB数据,可通过其路径唯一标识。
  4. 应用场景 提供的服务(分布式环境):统一命名服务、统一配置管理、统一集群管理(实时监控节点状态变化)、服务器节点动态上下线、软负载均衡(记录服务器访问数)等。

二、安装配置

将conf/目录下的zoo_sample.cfg修改为zoo.cfg,修改dataDir路径为zkData。在zkData目录下创建myid文件,输入唯一的int型集群编号。

  1. 命令
代码语言:javascript复制
bin/zkServer.sh start
bin/zkServer.sh status  # 集群模式必须启动半数以上才能看到状态
bin/zkServer.sh stop

bin/zkCli.sh
quit

jps
  1. 配置参数
代码语言:javascript复制
tickTime:心跳(ms)
initLimit:Leader和Follower在集群启动时之间通信时间(initLimit * tickTime)
syncLimit:Leader和Follower在集群启动后的通信(syncLimit * tickTime)
dataDir:数据存放目录
clientPort

# 集群模式在末尾添加如下。
# A表示第几号服务器;B表示该服务器的ip地址;C表示该服务器与集群中的Leader服务器交换信息的端口;D表示用来重新选举的端口。
server.${A}=${B}:${C}:${D}

三、内部原理

  1. 选举机制(*) (1)半数机制 集群中半数以上的机器存活,集群可用。

(2)选举机制 服务器都只投给自己,服务器编号大的优胜(权重大),已有Leader之后的大编号机器不能占用Leader位置。

代码语言:javascript复制
Serverid:服务器ID

Zxid:数据ID,服务器中存放的最大数据ID

Epoch:逻辑时钟,或投票的次数,每次投完增加。

Server选举状态:
LOOKING,竞选;
FOLLOWING,随从状态,同步Leader,参与投票;
OBSERVING,观察状态,同步Leader,不参与投票;
LEADING,领导者状态。
  1. 节点类型 持久(Persisitent):CS断连后,创建的节点不删除; Persisitent_sequential:创建ZNode时设置顺序标识,在分布式系统中,顺序号可用于全局排序,推断事件顺序。

短暂(Ephemeral):CS断连后,创建的节点自行删除;节点下线。 Ephemeral_sequential。

  1. Stat结构体 (1)czxid 创建节点的事务zxid,每次修改ZK状态都会收到一个zxid形式的时间戳,即ZK事务ID,用于排序。

(2)ctime znode被创建的毫秒数,从1970年开始。

(3)mzxid znode最后更新的事务zxid。

(4)mtime znode最后修改的毫秒数,从1970年开始。

(5)pZxid znode最后更新的子节点zxid。

(6)cversion znode子节点变化号,znode子节点修改的次数。

(7)dataversion znode数据变化号。

(8)acIVersion 访问控制列表的变化号。

(9)ephemeralOwner 如果是临时节点,该znode拥有着的session id;非临时节点,值为0。

(10)dataLength znode的数据长度。

(11)numChildren znode子节点数量。

  1. 监听器原理(*) 客户端与ZK服务器监听的过程如下:process()方法为用户业务层代码。
  1. 写数据流程 结合半数机制。

四、实例

1. 命令行操作

代码语言:javascript复制
# 启动客户端后
bin/zkCli.sh

help
ls /   # 查看当前结点所包含的内容
ls2 /  # 查看当前结点详细数据

# 创建节点,但必须要写入数据才会注册
create /node01 "message..."
create /node01/cnode01 "node01 cnode01 message..."
# 查看相应目录下的内容
get /node01

# 创建短暂节点,CS断连即删除
create -e /node01/cnode02 "node01 cnode02 message..."
# 创建带有序号的节点,显示创建/node01/cnode03 node03 cnode03 message...0000000001
create -s /node01/cnode03 "node01 cnode03 message..."
# 修改节点数据值
set /node01/cnode01 "node01 cnode01 new message..."

# 节点的值变化的监听
get /node01/cnode01 watch
# 节点的子节点变化的监听
ls /node01 watch

# 删除和递归删除
delete /node01/cnode01 
rmr /node01

# 查看节点状态
stat /node01

2. API应用

(1)常用API

代码语言:javascript复制
    <!--zookeeper-->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>${zookeeper.version}</version>
    </dependency>
代码语言:javascript复制
public class TestZookeeper {
    private String connectString = "192.168.73.128:2181";
    private int sessionTimeout = 20000;
    private ZooKeeper zkClient;

    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 持续监听动作
                WatchNode();
            }
        });
    }


    /**
     * 1.创建节点
     * 查看防火墙状态:systemctl status firewalld.service
     * 临时关闭防火墙:systemctl stop firewalld.service
     * 彻底关闭防火墙:systemctl disable firewalld.service
     */
    @Test
    public void createNode() throws KeeperException, InterruptedException {
        String path = zkClient.create("/node01", "12".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }


    /**
     * 2.获取子节点,并监控节点的变化
     */
    @Test
    public void getDataAndWatch() throws InterruptedException {
        System.out.println("start watch Node change...");

        // 配合init()中的process函数,监听1min
        Thread.sleep(1000*60);
        System.out.println("stop watch Node change.");
    }

    private void WatchNode(){
        List<String> children;
        try {
            children = zkClient.getChildren("/", true);
            System.out.println("--------start--------");
            for (String child : children) {
                System.out.println(child);
            }
            System.out.println("---------end---------");
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    /**
     * 3.判断节点是否存在
     */
    @Test
    public void exist() throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists("/node01", false);
        System.out.println(exists == null ? "not exist!" : "exist.");
    }

    /**
     * 4.客户端能实时洞察到服务器上下线的变化
     */
}

(2)服务器动态上下线案例 服务器和客户端对ZK集群来说均是客户端。 理解:创建节点是暂时的带序号的类型(ephemeral_sequence),因为下线就要删除注册信息(主机名称,通过参数传递),序号要递增。

代码语言:javascript复制
public class DistributeClient {
    private ZooKeeper zkClient;
    private String connectString = "192.168.73.128:2181";
    private int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient client = new DistributeClient();

        // 1.连接zk集群
        client.getConnect();

        // 2.注册节点
        server.register(args[0]);
        // 2*.注册监听
        client.getChildren();

        // 3.业务逻辑处理
        client.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getChildren() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/servers", true);

        ArrayList<String> host = new ArrayList<>();
        for (String child : children) {
            byte[] data = zkClient.getData("/servers"   child, false, null);
            host.add(new String(data));
        }
        System.out.println(host);
    }

	private void register(String homename) throws KeeperException, InterruptedException {
        String path = zkClient.create("/servers/server", homename.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(homename   "is online.");
    }

    private void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    getChildren();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

五、问题

  1. 简述zookeeper的选举机制? 选举机制
  2. zookeeper的监听原理? main()线程中创建ZooKeeper的对象,接着创建两个线程,connect负责与ZK Server的通信,将注册的监听事件发送给ZK,加入到注册监听器列表,ZK监听到有数据或路径变化时,将消息通过Listener线程通知ZooKeeper对象,并执行操作。
  3. zookeeper的部署方式有哪些?集群中的角色有哪些?集群最少需要几台机器? (单机模式、集群模式;Leader和Follower;3台)
  4. zookeeper的常用命令?

0 人点赞