基于 Zookeeper 实现分布式锁

2024-06-10 10:40:37 浏览数 (1)

前言

在分布式系统中,确保数据的一致性和避免冲突是一个核心问题,通常我们通过分布式锁来解决,分布式锁本质是一种同步机制,用于控制对共享资源或临界区的访问。

Zookeeper 作为分布式协调服务,为分布式锁的实现提供了一个有效的平台,本文将通过一个简单的示例介绍如何基于 Zookeeper 提供的接口和机制实现分布式锁。

声明

文章中所提供的代码仅供参考,只是为开发人员提供一种实用的分布式锁实现方法,并帮助读者理解如何利用Zookeeper的特性和机制来管理分布式系统中的锁。

请注意,这些代码并不适用于实际应用中

前置知识

分布式锁设计原则

实现一个分布式锁要满足以下几个基本要求:

  1. 互斥性/排他性:在同一时刻只允许一个客户端持有锁。
  2. 可用性:在客户端出现异常时,锁可以被正常释放,避免死锁。
  3. 同源性:锁不能被别的线程释放,否则会破坏互斥性/排他性。
  4. 可重入性:同一个客户端可以重复、递归调用该锁而不发生死锁。

除此之外,还要考虑在没有获得锁之前,客户端阻塞等待还是视为获取失败,这个取决于业务场景。

Zookeeper

Zookeeper 是一个传统的分布式协调服务,它更多的被用来作为一个协调器使用,比如来协调管理 Hadoop 集群、协调 Kafka 的 leader 选举等。

Zookeeper的哪些特性和机制可以高效的实现分布式锁的要求?

  1. 临时节点:临时节点的生命周期依赖创建它的会话,当会话结束后,临时节点就会被删除。此特性可以满足分布式锁的可用性。
  2. 顺序节点:在创建顺序节点时,Zookeeper会分配一个递增的计数器,排在最前面的节可以获取到锁。此特性可以实现公平锁。(没有基础开发人员可以这么理解创建节点:向同一目录创建一个节点即为获取锁。)
  3. Watcher机制:通过Watcher机制当前节点可以监听前一个节点的变化,在前一个节点删除时当前节点可以得知锁被释放,从而获取到锁。
  4. 节点数据:创建节点时设置客户端会话唯一标识为值,可以实现可重入性。

而互斥性/排他性、同源性需要通过客户端控制,代码示例中会说明。

分布式锁实现

创建一个Maven项目,导入zkclient依赖即可开始编码

代码语言:xml复制
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

下面的示例代码满足了分布式锁的基本要求,属于可阻塞的分布式锁,也就是在没有获得锁之前,客户端阻塞等待。

代码语言:java复制
public class DistributedLock {

    private ZooKeeper client;

    // 连接信息
    private String connectString = "127.0.0.1:2181";

    // 超时时间
    private int sessionTimeOut = 30000;

    // 等待zk连接成功
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    // 等待节点变化
    private CountDownLatch waitLatch = new CountDownLatch(1);

    //当前节点
    private String currentNode;

    //前一个节点路径
    private String waitPath;

    private final String ROOT_PATH = "/locks";

    //1. 在构造方法中获取连接
    public DistributedLock() throws Exception {
        client = new ZooKeeper(connectString, sessionTimeOut, watchedEvent -> {
            //  连上ZK,可以释放
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                countDownLatch.countDown();
            }

            //waitLatch 需要释放 (节点被删除并且删除的是前一个节点)
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                waitLatch.countDown();
            }
        });

        //等待Zookeeper连接成功,连接完成继续往下走
        countDownLatch.await();
        //2. 判断节点是否存在
        Stat stat = client.exists(ROOT_PATH, false);
        if (stat == null) {
            //创建一下根节点
            client.create(ROOT_PATH, ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


        }

    }

    //3.对ZK加锁
    public boolean zkLock() {

        try {
            String sessionId = String.valueOf(client.getSessionId());
            List<String> children = client.getChildren(ROOT_PATH, false);
            if (!children.isEmpty()) {
                Collections.sort(children);
                String path = children.get(0);
                byte[] data = client.getData(ROOT_PATH   "/"   path, false, null);
                //最小序号节点是当前客户端创建的不用再次获取
                if (sessionId.equals(new String(data))) {
                    System.out.println("重入锁");
                    return true;
                }
            }

            //创建 临时带序号节点,将当前客户端id作为值,实现可重入
            currentNode = client.create(ROOT_PATH   "/seq-", sessionId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            children = client.getChildren(ROOT_PATH, false);
            //如果创建的节点只有一个值,就直接获取到锁,如果不是,监听它前一个节点
            if (children.size() == 1) {
                return false;
            } else {
                //先排序
                Collections.sort(children);

                //获取节点名称
                String nodeName = currentNode.substring((ROOT_PATH   "/").length());

                //通过名称获取该节点在集合的位置
                int index = children.indexOf(nodeName);

                if (index == -1) {
                    System.out.println("数据异常,nodeName:"   nodeName);
                    return false;
                } else if (index == 0) {
                    //创建的节点是否是最小序号节点,如果是 就获取到锁;如果不是就监听前一个节点
                    return true;
                } else {
                    //需要监听前一个节点变化
                    waitPath = ROOT_PATH   "/"   children.get(index - 1);
                    client.getData(waitPath, true, null);

                    //等待监听执行
                    waitLatch.await();
                    return true;
                }
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public void unZkLock() throws KeeperException, InterruptedException {
        //删除节点
        client.delete(currentNode, -1);
    }


}

示例中创建了一个名为/locks的根节点作为锁的标识符,当客户端需要获取锁时调用zkLock(),该方法会先判断当前客户端是否已经持有锁,如果持有不创建节点(这里是实现可重入),否则会在/locks根节点创建一个临时顺序节点,当同时有多个客户端获取锁时节点目录时这样的

代码语言:powershell复制
├── locks
│   └── seq-0000000006
│   └── seq-0000000005
│   └── seq-0000000004
│   └── seq-0000000003
│   └── seq-0000000002
│   └── seq-0000000001

如果该客户端创建的节点是最小的节点,那么成功获取到锁处理业务,否则监听前一个节点并阻塞等待,当前一个节点删除时通知该客户端获取锁。

当客户端调用unZkLock()时删除其创建的节点来释放锁,因为删除自己创建的节点,所以自然而然满足同源性。

整个流程与交互如下图

在这里插入图片描述在这里插入图片描述

这里需要注意的是当某个节点发生变化时,Zookeeper会按照节点的顺序逐个通知客户端,所以当图中seq-0000000002因故障先被删除,/seq-0000000003也是需要等待/seq-0000000001被删除后才会收到seq-0000000002删除的通知,所以只用监听前一个节点被删除即可。

以上代码编写完后,在需要使用分布式锁的地方直接调用即可,代码如下:

代码语言:java复制
public static void main(String[] args) {
    try {
        DistributedLock lock = new DistributedLock();
        if (lock.zkLock()) {
            System.out.println(Thread.currentThread()   "获取到锁");
            Thread.sleep(20 * 1000);
            lock.unZkLock();
            System.out.println(Thread.currentThread()   "释放锁");
        }

    } catch (InterruptedException | KeeperException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Curator框架实现分布式锁

Curator是基于Zookeeper原生API接口封装的客户端框架,解决了底层的细节开发问题,提供了一套高级API,实现了如分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等各种应用场景。

使用Curator实现分布式锁可以大大简化代码的编写,只需引入相关依赖,直接调用封装好的接口即可。其原理与上面所述的分布式锁实现方式类似。代码如下:

Curator相关依赖

代码语言:xml复制
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
代码语言:java复制
public static void main(String[] args) {
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", (i, l, retrySleeper) -> false);
        client.start();
        InterProcessMutex lock = new InterProcessMutex(client, "/locks");
        try {
            // 获取互斥锁
            lock.acquire();

            // 执行需要互斥访问的代码
            // 释放互斥锁
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭Curator Framework客户端
            client.close();
        }
}

InterProcessMutex 是 Curator 提供的一种分布式锁的实现,使用 InterProcessMutex 可以确保在多个进程之间对共享资源的互斥访问,从而避免数据冲突和并发问题。

总结

在实现分布式锁上Zookeeper的特性提供了很大的帮助,并且它的高可用性、强一致性使得分布式锁变得更加可靠和高效。文中提供了两种基于 Zookeeper 实现分布式方案,不论是使用 Zookeeper API 还是 Curator API ,其原理是一样的。因此,我们可以在理解其原理的基础上,直接使用这些成熟的框架。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

0 人点赞