Zookeeper实现分布式锁

2021-12-22 16:23:49 浏览数 (1)

最初看过Zookeeper实现分布式锁的设计思想,感觉类似于之前写的Redis链接 ,都是通过只有一个客户端创建某个结点,只要结点存在其他的结点无法创建,实现一种资源的占用,后来又想想其实锁不就是这样嘛,也不是只是Zookeeper和Redis,就算是单进程各个线程之间的锁其实也是一种对象资源管控罢了。 后面接触多了,发现其实Zookeeper和Redis的分布式锁还是不一样滴~

一,先看一下有些部分类似于Redis的Zookeeper式分布式锁

如图所示,我们三个客户端去监听同一节点,那么只有一个客户端比如A客户端能够创建成功,那么其余结点由于监听了节点事件,当A使用完毕更改了结点状态,其余节点也就知道可以开始再次抢占资源了。

羊群效应:但是这样做有个缺点,由于众多结点监听该同一事件,那么每次发生节点变化虽然只有一个结点能够抢占资源,但是却惊动了整个群体,造成很多事件的变更,显然如果对于很多结点的分布式应用,这样并不划算。

二,利用有序节点实现分布式锁

如图所示,每个客户端都去锁结点下创建一个属于自己的有序临时结点,临时结点会监听一个比自己小的结点,如果自己是当前最小的,那么就代表你是可以被正常使用的(抢占资源)。

这里以Java提供的Lock为基础进行扩展实现,并且以继承的方式加watcher机制。Zk操作使用ZK提供的原生Java API。

分布式锁代码
代码语言:javascript复制
public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zk=null;//创建zk客户端
    private String ROOT_LOCK="/locks";//定义一个根节点
    private String WAIT_LOCK;//等待前一个锁
    private String CURRENT_LOCK;//表示当前锁
    private CountDownLatch countDownLatch;

    public DistributedLock( ) {
        try {
            this.zk = new ZooKeeper("Ip:port", 4000, this);//这里watcher放在本类中实现了
            //判断根节点是否存在
            Stat stat=zk.exists(ROOT_LOCK,false);
            if (stat==null){
                zk.create(ROOT_LOCK,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    @Override
    public boolean tryLock() {//当结点调用尝试获得锁方法时
        try {
            //创建临时有序结点并复制给当前锁
            CURRENT_LOCK=zk.create(ROOT_LOCK "/","0".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//创建临时有序结点
            System.out.println(Thread.currentThread().getName() "创建临时结点--->" CURRENT_LOCK "--->尝试获得锁");
            //获取孩子结点(锁结点)
            List<String> childrens = zk.getChildren(ROOT_LOCK, false);
            //创建孩子集合并添加到有序Set中
            SortedSet<String> nodes=new TreeSet<>();
            for (String children:childrens){
                nodes.add(ROOT_LOCK "/" children);
            }
            String firstNode = nodes.first();
            //尝试获得比当前结点更小的结果集,以此来获取到最后一个最大的结点也就是本结点前一个结点
            //注意,这里不可以用nodes.last(),因为这样获取到的是全部结点最后一个结点,是大于本结点那个
            SortedSet<String> lessThenMe = ((TreeSet<String>)nodes).headSet(CURRENT_LOCK);
            if (CURRENT_LOCK.equals(firstNode)){//如果当前结点是集合中最小的结点则其获得锁
                return true;
            }
            if (!lessThenMe.isEmpty()){//如果存在比当前结点更小的结点
                WAIT_LOCK=lessThenMe.last();//获得该有序结合的最后一个结点,即前一个结点,设置给WAIT_LOCL
                System.out.println("lessThenMe最后一个 : " lessThenMe.last());
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void lock() {
        if (this.tryLock()){//如果获得锁成功
            System.out.println(Thread.currentThread().getName() "获得锁成功");
            return;
        }
        //如果没有成功获得锁,那么等待前一个结点释放锁
        waitForLock(WAIT_LOCK);
    }

    private boolean waitForLock(String preNode){
        try {
            //监听上一个比自己小的结点
            Stat stat=zk.exists(preNode,true);
            if (stat!=null){
                System.out.println(Thread.currentThread().getName() "等待--->" ROOT_LOCK "/" preNode "释放锁");
                countDownLatch=new CountDownLatch(1);
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() "--->获得锁");
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return true;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }


    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName() "释放锁" CURRENT_LOCK);
        try {
            zk.delete(CURRENT_LOCK,-1);//version=-1不管如何都会删除
            CURRENT_LOCK=null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if (this.countDownLatch!=null){
            this.countDownLatch.countDown();
        }
    }
}

测试类

代码语言:javascript复制
public class ZkDistributedLockTest {
    public static void main(String[] args) throws IOException {
        CountDownLatch countDownLatch=new CountDownLatch(10);
        for(int i=0;i<10;i  ){
            new Thread(()->{
                try {
                    countDownLatch.await();
                    DistributedLock distributedLock=new DistributedLock();
                    distributedLock.lock();//当前线程尝试获得锁
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"线程" i).start();
            countDownLatch.countDown();
        }
        System.in.read();
    }
}
说一下思想:

1.这里设定了一个倒计时CountDownLatch1用于控制和阻塞并发线程;

2.我们让每个线程创建分布式锁工具类**DistributedLock,每个线程会连接到ZK服务器成为一个客户端,只有一个创建跟结点

3.接下来每个线程都去尝试调用我们的lock()方法,我们的lock会去调用tryLock()方法

4.首先,我们让每个线程在tryLock()里去创建一个临时顺序结点,跟线程名无关,先到者排序在前

5.然后,我们会查出此时分布式锁根节点下的所有顺序结点,如果当前结点是最小的结点那么它将会获得锁,如果当前结点不是最小的结点,那么找到前一个结点并监听,并且利用countdownlatch设置倒计时为1,并阻塞当前线程2

6.当前一个结点出现监听事件(删除,更改),我们释放countdownlatch,释放本线程

如此,我们可以以分布式锁的形式实现Zk结点的顺序作用。

上述提供的是一种自己实现watcher和lock利用原生zk API简单实现分布式锁,实际上curator提供了许多高度封装好的分布式锁简化了步骤,而且提供了更多细节操作,比如读写超市断开连接的选举等~

eg

参考文献:

1 CountDownLauch解析.https://cloud.tencent.com/developer/article/2002445

2利用SortedSet实现结点排序,SortedSet.headSet(string key),可以取出小于key的结点,SortedSet.first获取第一个结点,SortedSet.last获取最后一个结点具体的https://blog.csdn.net/xjk201/article/details/81586209

哈哈哈,写论文入魔了,搞个参考文献皮一下~

0 人点赞