前言
在分布式系统中,确保数据的一致性和避免冲突是一个核心问题,通常我们通过分布式锁来解决,分布式锁本质是一种同步机制,用于控制对共享资源或临界区的访问。
Zookeeper 作为分布式协调服务,为分布式锁的实现提供了一个有效的平台,本文将通过一个简单的示例介绍如何基于 Zookeeper 提供的接口和机制实现分布式锁。
声明
文章中所提供的代码仅供参考,只是为开发人员提供一种实用的分布式锁实现方法,并帮助读者理解如何利用Zookeeper的特性和机制来管理分布式系统中的锁。
请注意,这些代码并不适用于实际应用中。
前置知识
分布式锁设计原则
实现一个分布式锁要满足以下几个基本要求:
- 互斥性/排他性:在同一时刻只允许一个客户端持有锁。
- 可用性:在客户端出现异常时,锁可以被正常释放,避免死锁。
- 同源性:锁不能被别的线程释放,否则会破坏互斥性/排他性。
- 可重入性:同一个客户端可以重复、递归调用该锁而不发生死锁。
除此之外,还要考虑在没有获得锁之前,客户端阻塞等待还是视为获取失败,这个取决于业务场景。
Zookeeper
Zookeeper 是一个传统的分布式协调服务,它更多的被用来作为一个协调器使用,比如来协调管理 Hadoop 集群、协调 Kafka 的 leader 选举等。
Zookeeper的哪些特性和机制可以高效的实现分布式锁的要求?
- 临时节点:临时节点的生命周期依赖创建它的会话,当会话结束后,临时节点就会被删除。此特性可以满足分布式锁的可用性。
- 顺序节点:在创建顺序节点时,Zookeeper会分配一个递增的计数器,排在最前面的节可以获取到锁。此特性可以实现公平锁。(没有基础开发人员可以这么理解创建节点:向同一目录创建一个节点即为获取锁。)
- Watcher机制:通过Watcher机制当前节点可以监听前一个节点的变化,在前一个节点删除时当前节点可以得知锁被释放,从而获取到锁。
- 节点数据:创建节点时设置客户端会话唯一标识为值,可以实现可重入性。
而互斥性/排他性、同源性需要通过客户端控制,代码示例中会说明。
分布式锁实现
创建一个Maven项目,导入zkclient依赖即可开始编码
代码语言:xml复制<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
下面的示例代码满足了分布式锁的基本要求,属于可阻塞的分布式锁,也就是在没有获得锁之前,客户端阻塞等待。
代码语言:java复制public class DistributedLock {
private ZooKeeper client;
// 连接信息
private String connectString = "127.0.0.1:2181";
// 超时时间
private int sessionTimeOut = 30000;
// 等待zk连接成功
private CountDownLatch countDownLatch = new CountDownLatch(1);
// 等待节点变化
private CountDownLatch waitLatch = new CountDownLatch(1);
//当前节点
private String currentNode;
//前一个节点路径
private String waitPath;
private final String ROOT_PATH = "/locks";
//1. 在构造方法中获取连接
public DistributedLock() throws Exception {
client = new ZooKeeper(connectString, sessionTimeOut, watchedEvent -> {
// 连上ZK,可以释放
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
//waitLatch 需要释放 (节点被删除并且删除的是前一个节点)
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
});
//等待Zookeeper连接成功,连接完成继续往下走
countDownLatch.await();
//2. 判断节点是否存在
Stat stat = client.exists(ROOT_PATH, false);
if (stat == null) {
//创建一下根节点
client.create(ROOT_PATH, ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//3.对ZK加锁
public boolean zkLock() {
try {
String sessionId = String.valueOf(client.getSessionId());
List<String> children = client.getChildren(ROOT_PATH, false);
if (!children.isEmpty()) {
Collections.sort(children);
String path = children.get(0);
byte[] data = client.getData(ROOT_PATH "/" path, false, null);
//最小序号节点是当前客户端创建的不用再次获取
if (sessionId.equals(new String(data))) {
System.out.println("重入锁");
return true;
}
}
//创建 临时带序号节点,将当前客户端id作为值,实现可重入
currentNode = client.create(ROOT_PATH "/seq-", sessionId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
children = client.getChildren(ROOT_PATH, false);
//如果创建的节点只有一个值,就直接获取到锁,如果不是,监听它前一个节点
if (children.size() == 1) {
return false;
} else {
//先排序
Collections.sort(children);
//获取节点名称
String nodeName = currentNode.substring((ROOT_PATH "/").length());
//通过名称获取该节点在集合的位置
int index = children.indexOf(nodeName);
if (index == -1) {
System.out.println("数据异常,nodeName:" nodeName);
return false;
} else if (index == 0) {
//创建的节点是否是最小序号节点,如果是 就获取到锁;如果不是就监听前一个节点
return true;
} else {
//需要监听前一个节点变化
waitPath = ROOT_PATH "/" children.get(index - 1);
client.getData(waitPath, true, null);
//等待监听执行
waitLatch.await();
return true;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public void unZkLock() throws KeeperException, InterruptedException {
//删除节点
client.delete(currentNode, -1);
}
}
示例中创建了一个名为/locks
的根节点作为锁的标识符,当客户端需要获取锁时调用zkLock()
,该方法会先判断当前客户端是否已经持有锁,如果持有不创建节点(这里是实现可重入),否则会在/locks
根节点创建一个临时顺序节点,当同时有多个客户端获取锁时节点目录时这样的
├── locks
│ └── seq-0000000006
│ └── seq-0000000005
│ └── seq-0000000004
│ └── seq-0000000003
│ └── seq-0000000002
│ └── seq-0000000001
如果该客户端创建的节点是最小的节点,那么成功获取到锁处理业务,否则监听前一个节点并阻塞等待,当前一个节点删除时通知该客户端获取锁。
当客户端调用unZkLock()
时删除其创建的节点来释放锁,因为删除自己创建的节点,所以自然而然满足同源性。
整个流程与交互如下图
这里需要注意的是当某个节点发生变化时,Zookeeper会按照节点的顺序逐个通知客户端,所以当图中seq-0000000002
因故障先被删除,/seq-0000000003
也是需要等待/seq-0000000001
被删除后才会收到seq-0000000002
删除的通知,所以只用监听前一个节点被删除即可。
以上代码编写完后,在需要使用分布式锁的地方直接调用即可,代码如下:
代码语言:java复制public static void main(String[] args) {
try {
DistributedLock lock = new DistributedLock();
if (lock.zkLock()) {
System.out.println(Thread.currentThread() "获取到锁");
Thread.sleep(20 * 1000);
lock.unZkLock();
System.out.println(Thread.currentThread() "释放锁");
}
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
Curator框架实现分布式锁
Curator是基于Zookeeper原生API接口封装的客户端框架,解决了底层的细节开发问题,提供了一套高级API,实现了如分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等各种应用场景。
使用Curator实现分布式锁可以大大简化代码的编写,只需引入相关依赖,直接调用封装好的接口即可。其原理与上面所述的分布式锁实现方式类似。代码如下:
Curator相关依赖
代码语言:xml复制<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
代码语言:java复制public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", (i, l, retrySleeper) -> false);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/locks");
try {
// 获取互斥锁
lock.acquire();
// 执行需要互斥访问的代码
// 释放互斥锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭Curator Framework客户端
client.close();
}
}
InterProcessMutex
是 Curator 提供的一种分布式锁的实现,使用 InterProcessMutex
可以确保在多个进程之间对共享资源的互斥访问,从而避免数据冲突和并发问题。
总结
在实现分布式锁上Zookeeper的特性提供了很大的帮助,并且它的高可用性、强一致性使得分布式锁变得更加可靠和高效。文中提供了两种基于 Zookeeper 实现分布式方案,不论是使用 Zookeeper API 还是 Curator API ,其原理是一样的。因此,我们可以在理解其原理的基础上,直接使用这些成熟的框架。
我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!