1 Zookeeper中的watcher
client端会对某个znode 注册一个watcher事件,当该znode发生变化时,这些client会收到ZooKeeper的通知,然后client可以根据znode变化来做出业务上的改变等。
经典使用场景:zookeeper为dubbo提供服务的注册与发现,作为注册中心,但是大家有没有想过zookeeper为啥能够实现服务的注册与发现吗?
1.1 Zookeeper Watcher 特性
Zookeeper主要的作用之一就是当节点发生改变时可以进行集群通知,以便做出合理的处理,而感知节点变化就需要监听和通知,这是zookeeper实现负载均衡、协调管理、注册发布等功能的主要部分。
Watcher 是zooKeeper中一个非常核心功能 ,客户端watcher 可以监控节点的数据变化以及它子节点的变化,是Zookeeper用来实现distribute lock, distribute configure, distribute queue等应用的主要手段。要监控data_tree上的任何节点的变化(节点本身的增加,删除,数据修改,以及孩子的变化)都可以在获取该数据时注册一个Watcher,这有很像Listener模式。一旦该节点数据变化,Follower会发送一个notification response,client收到notification响应,则会查找对应的Watcher并回调他们。
Client可以在某个ZNode上设置一个Watcher,来Watch该ZNode上的变化。如果该ZNode上有相应的变化,就会触发这个Watcher,把相应的事件通知给设置Watcher的Client。需要注意的是,ZooKeeper中的Watcher是一次性的,即触发一次就会被取消,如果想继续Watch的话,需要客户端重新设置Watcher。
ZooKeeper Watcher 特性总结:
- 注册只能确保一次消费
无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,开发人员在 Watcher 的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。如果注册一个 Watcher 之后一直有效,那么针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。
- 客户端串行执行
客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。
- 轻量级设计
WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分的内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的 Watcher 只会通知客户指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据,这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。
1.2 zookeeper监听的原理
1、当创建zookeeper客户端时,内部会默认创建两个线程;
- 一个是connet线程,主要负责网络通信连接;
- 一个是Listener线程,主要负责监听;
2、客户端通过connet线程连接服务器;使用getChildren("/",true);其中"/"表示监听的根目录,true表示监听;false表示不监听。
3、在zookeeper监听列表中,将注册的监听事件放入到监听列表中,表示这个服务器中的"/"目录被客户端监听。
4、一旦被监听的目录下,数据或路径发生改变,zookeeper就会将这个消息发送给Listener线程。
5、Listener线程内部调用process方法,采取相应的措施进行操作。
zookeeper事件类型主要有以下几种:
znode节点可以设置两类watch,一种是DataWatches,基于znode节点的数据变更从而触发 watch 事件,触发条件getData()、exists()、setData()、 create()。
另一种是Child Watches,基于znode的孩子节点发生变更触发的watch事件,触发条件 getChildren()、 create()。
而在调用 delete() 方法删除znode时,则会同时触发Data Watches和Child Watches,如果被删除的节点还有父节点,则父节点会触发一个Child Watches。
zookeeper的监听主要分为两种:
- 标准的观察者模式:该模式主要是使用的watcher监听器,和zookeeper原生监听器一样,该监听器只有一种,那就是一次性触发器,对节点只进行一次监听,监听完后立刻失效,并且也不会对子节点进行监听。
- 缓存监听模式:该模式引入了一种本地缓存视图的Cache机制,来实现对zookeeper服务端的事件监听。Cache事件监听可以理解为本地缓存视图与远程zookeeper视图的比对过程。Cache提供了重复注册的功能。其原理就是Cache在客户端缓存了znode的各种状态,不断的和zookeeper集群进行比较,而且其比对也十分简单,如果未发生变化就会一直发送true,一旦发生改变就会发送false。如果还是不太理解可以结合上面的监听器原理解析图进行理解。
缓存监听模式分为三种:Path Catche 、Node Cache、Tree Cache
- Path Cache:用来观察znode的子节点并缓存状态,如果znode的子节点被删除、修改、创建,那么就会触发监听事件。
- Node Cache:用来观察znode本身的状态,如果znode本身被创建、更新、删除,那么就会触发监听事件。
- Tree Cache:用来观察所有节点和数据的变化,监听器的注册接口为:TreeCacheListener。
1.3 Watch触发器类型
1.3.1 一次性触发器
客户端在节点设置了Watch监听事件后,对该节点(只对节点本身有效,其子节点是无效的)进行修改或删除都会监听到消息。但是只能监听一次;例如:客户端设置getData("/znode",true)后,第一次对该节点进行修改或删除,都会触发监听。但是再次修改或删除时,则不会进行监听。如果需要再次监听则需要重新设置监听。
代码语言:javascript复制package com.bpc.client;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
/**
* @author xiaobai
*/
public class CuratorTest03 {
/**
* 连接地址
*/
String ZK_ADDRESS = "127.0.0:2181";
/**
* 重试策略,RetryPolicy是个接口,里面的每一种实现都是一种策略。
* 可以根据自身业务情况进行选择
*/
RetryPolicy policy = new RetryNTimes(3, 2000);
/**
* curator客户端开发是一种流式编码方式,就是不断的点点点
* 老版本创建连接方式
*/
/*CuratorFramework client = CuratorFrameworkFactory.builder()
//连接地址
.connectString(ZK_ADDRESS)
//session超时时间
.sessionTimeoutMs(50000)
//连接超时时间
.connectionTimeoutMs(5000)
//重试策略
.retryPolicy(policy)
.build();*/
CuratorFramework client = null;
public CuratorTest03() {
/**
* 新版本创建连接方式
*/
client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, policy);
//启动客户端
client.start();
}
public CuratorFramework getClient() {
return client;
}
public void setClient(CuratorFramework client) {
this.client = client;
}
}
代码语言:javascript复制public static void main(String[] args) throws Exception {
CuratorTest03 curator = new CuratorTest03();
test01(curator);
//休眠时间长一点,以便于测试事件的监听
Thread.sleep(1000000000);
}
public static void test01(CuratorTest03 curator) throws Exception {
String path = "/test001";
//创建节点
// curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
//建立一次性节点监听
curator.client.getData().usingWatcher((Watcher) (WatchedEvent event) -> {
//可以对该节点数据进行操作,此处可以监听到
System.out.println("对节点 [" event.getPath() "] 进行了操作,操作事件:" event.getType().name());
}).forPath(path);
}
1.3.2 单节点触发器PathCache
该触发器只能对当前节点的直属子节点一直进行监听,其子节点的子节点也是监听不到的。直属子节点的初始化、创建、修改、删除等一系列操作都可以被监听到,并可以一直进行监听。
代码语言:javascript复制public static void test04(CuratorTest03 curator) throws Exception {
String path = "/test003";
//创建节点
curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
PathChildrenCache childrenCache = new PathChildrenCache(curator.client, path, false);
//必须设置启动模式为POST_INITIALIZED_EVENT才可以进行监听
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
System.out.println("子节点监听,进行" event.getType() "操作");
});
}
1.3.3 单节点触发器NodeCache
该触发器会一直监听当前节点的增删改操作。不会监听其子节点的变化。
代码语言:javascript复制public static void test06(CuratorTest03 curator) throws Exception {
String path = "/test006";
//创建节点
curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
//创建监听
NodeCache nodeCache = new NodeCache(curator.client, path);
nodeCache.start(true);
nodeCache.getListenable().addListener(()->{
System.out.println("监听到:" nodeCache.getPath() "节点发生变化!");
});
//修改数据
curator.client.setData().forPath(path,"我叫小白!".getBytes());
}
1.3.4 树形节点触发器Tree Node
该触发器会一直监听本节点及下面的所有子节点 的增删改操作,相当于一个树下面的所有节点他全部会进行监听。
代码语言:javascript复制public static void test07(CuratorTest03 curator) throws Exception {
String path = "/test007";
//创建监听
TreeCache treeCache = new TreeCache(curator.client,path);
treeCache.start();
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
System.out.println("监听到:" data.getPath() "节点发生变化!,监听状态为:" treeCacheEvent.getType());
}
};
treeCache.getListenable().addListener(listener);
//创建节点
curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
//修改数据
curator.client.setData().forPath(path,"我叫小白!".getBytes());
//创建子节点
curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path "/tetst01");
//添加子节点内容
curator.client.setData().forPath(path "/tetst01","测试创建子节点内容".getBytes());
//删除子节点
curator.client.delete().forPath(path "/tetst01");
//删除本节点
curator.client.delete().forPath(path);
}
1.3.5 异步通知触发器CuratorListener监听
该监听器主要针对background通知和错误通知。使用此监听器后,只要是调用inBackground方法都会异步接收到监听事件。不管操作的成功或失败,他都能监听到增删改查所有操作,所以该监听器不可靠。
代码语言:javascript复制public static void test05(CuratorTest03 curator) throws Exception {
String path ="/test004";
//创建节点
curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path,"wo jiao xiao bai !".getBytes());
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("监听事件被触发,该操作是:" event.getType());
}
};
//添加监听事件
curator.client.getCuratorListenable().addListener(listener);
//异步获取节点数据
curator.client.getData().inBackground().forPath(path);
//变更节点数据
curator.client.setData().inBackground().forPath(path,"我叫小白!".getBytes());
//添加子节点
curator.client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path "/test01");
//创建子节点的子节点
curator.client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path "/test01/tet02");
//修改子节点内容
curator.client.setData().inBackground().forPath(path "/test01","123".getBytes());
//修改子节点的子节点内容
curator.client.setData().inBackground().forPath(path "/test01/tet01","456".getBytes());
//删除子节点的子节点
curator.client.create().inBackground().forPath(path "/test01/test01");
//删除子节点
curator.client.delete().inBackground().forPath(path "/test01");
//删除本节点
curator.client.delete().inBackground().forPath(path);
}
1.4 zookeeper实现服务的注册与发现
zookeeper的服务注册与发现,主要应用的是zookeeper的znode节点数据模型和watcher机制,大致的流程如下:
- 服务注册:服务提供者(Provider)启动时,会向zookeeper服务端注册服务信息,也就是创建一个节点,例如:用户注册服务com.xxx.user.register,并在节点上存储服务的相关数据(如服务提供者的ip地址、端口等)。
- 服务发现:服务消费者(Consumer)启动时,根据自身配置的依赖服务信息,向zookeeper服务端获取注册的服务信息并设置watch监听,获取到注册的服务信息之后,将服务提供者的信息缓存在本地,并进行服务的调用。
- 服务通知:一旦服务提供者因某种原因宕机不再提供服务之后,客户端与zookeeper服务端断开连接,zookeeper服务端上服务提供者对应服务节点会被删除(例如:用户注册服务com.xxx.user.register),随后zookeeper服务端会异步向所有消费用户注册服务com.xxx.user.register,且设置了watch监听的服务消费者发出节点被删除的通知,消费者根据收到的通知拉取最新服务列表,更新本地缓存的服务列表。
上边的过程就是zookeeper可以实现服务注册与发现的大致原理。
参考链接
ZooKeeper学习之路 (八)ZooKeeper原理解析
Zookeeper系列二:分布式架构详解、分布式技术详解、分布式事务
随笔分类 - Zookeeper专题系列
Zookeeper简介及核心概念_Cynicism_Kevin的博客-CSDN博客
zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客
Zookeeper工作原理(详细)_zookeeper原理_笔墨登场说说的博客-CSDN博客
Zookeeper的功能以及工作原理_zookeeper的主要功能_空白格的空白的博客-CSDN博客
ZooKeeper基本原理
深入了解Zookeeper核心原理
Zookeeper原理解析 - 简书
zookeeper的领导者选举和原子广播 - lpshou - 博客园
Zookeeper原理详解_百里度的博客-CSDN博客
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议) - 掘金
从背景到原理,到架构体系,粉碎Zookeeper面试连环炮 - 掘金