手写Zookeeper分布式锁

2022-05-05 16:18:54 浏览数 (1)

为什么要用Zookeeper做分布式锁?

在说Zookeeper做分布式锁之前,我们知道Redis也可以做分布式锁。那我为什么要用Zookeeper做分布式锁呢?

上图为数据库,Redis,Zookeeper实现分布式锁的技术对比。不用说,数据库实现分布式锁的性能肯定很低,Redis虽然性能很高,但是最终一致性上却是输于Zookeeper。Zookeeper在分布式集群上有天然优势。在生产环境中,中间件一般以集群形式部署,那么这里涉及到主从同步问题,Redis主从集群中,如果Master节点在向Slave节点发送RDB文件同步时,有可能Master节点挂掉,文件里面的数据有可能会有一秒的丢失。然而Redis有一套同步确认的策略。用Redis上锁实际是Redis里面setnx 数据,一般需要主节点同步数据给从节点成功了,才能认为成功。当然为了提升效率,有一个RedLock模型,该模型认为只要同步节点成功数超过半数就认为上锁成功。然而这个RedLock模型算法维护起来相当麻烦。但是Zookeeper天然支持这个模型。

还有一个用Zookeeper的原因在于其有一个watcher机制,线程抢不到锁时阻塞,当持有锁的线程释放锁时,这个watcher机制会主动通知其他线程唤醒去抢锁。而Redis的锁,线程抢锁的过程是一个自旋setnx K-V的过程,这种抢锁没有Zookeeper的抢锁方式优雅。所以我喜欢用Zookeeper实现分布式锁。

Zookeeper特性

1.节点树数据结构,znode是一个跟Unix文件系统路径相似的节点,可以往这个节点存储或获取数据; 2.通过客户端可对znode进行增删改查的操作,还可以注册watcher监控znode的变化。

通过上面两个特性可以实现分布式锁。

上图为Zookeeper实现分布式锁的流程图。下面为代码的实现。

代码实现

创建一个ZkDistributeLock类实现Lock接口

代码语言:javascript复制
@Slf4j
public class ZkDistributeLock implements Lock {

    @Autowired
    private ZookeeperConfig config;

    private String lockPath;

    private ZkClient client;

    private String currentPath;

    private String beforePath;
    ...
}

构造函数里面创建锁的根节点。

代码语言:javascript复制
public ZkDistributeLock(String lockPath) {
    super();
    this.lockPath = lockPath;
    ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
    this.client = zookeeperConfig.getConnectionWithoutSpring();
    this.client.setZkSerializer(new MyZkSerializer());

    if (!this.client.exists(lockPath)) {
        try {
            this.client.createPersistent(lockPath);
        } catch (ZkNodeExistsException e) {
            e.getStackTrace();
        }
    }
}

lock()方法编写

代码语言:javascript复制
@Override
public void lock() {

    if (!tryLock()) {
        //没获得锁,阻塞自己
        waitForLock();
        //再次尝试
        lock();
    }

}

tryLock()方法编写

代码语言:javascript复制
@Override
public boolean tryLock() {  //不会阻塞
    //创建节点
    if (this.currentPath == null) {
        currentPath = this.client.createEphemeralSequential(lockPath   "/", "lock");
    }
    List<String> children = this.client.getChildren(lockPath);
    Collections.sort(children);

    if (currentPath.equals(lockPath   "/"   children.get(0))) {
        return true;
    } else {
        int currentIndex = children.indexOf(currentPath.substring(lockPath.length()   1));
        beforePath = lockPath   "/"   children.get(currentIndex - 1);
    }

    log.info("锁节点创建成功:{}", lockPath);
    return false;

unlock()方法编写

代码语言:javascript复制
@Override
public void unlock() {
    client.delete(this.currentPath);
}

waitForLock()方法编写

代码语言:javascript复制
private void waitForLock() {
    CountDownLatch count = new CountDownLatch(1);
    IZkDataListener listener = new IZkDataListener() {
        @Override
        public void handleDataChange(String s, Object o) throws Exception {

        }

        @Override
        public void handleDataDeleted(String s) throws Exception {
            System.out.println(String.format("收到节点[%s]被删除了",s));
            count.countDown();
        }
    };

    client.subscribeDataChanges(this.beforePath, listener);

    //自己阻塞自己
    if (this.client.exists(this.beforePath)) {
        try {
            count.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //取消注册
    client.unsubscribeDataChanges(this.beforePath, listener);
}

测试

代码语言:javascript复制
//创建订单时加锁
@Override
public void createOrder() {

    String lockPath = "/distribute_lock";
    String orderCode = null;
    //zk 分布式锁
    Lock lock = new ZkDistributeLock(lockPath);
    // zkDistributeLock.setLockPath(lockPath);
    lock.lock();
    try {
        orderCode = ocg.getOrderCode();
    } finally {
        lock.unlock();
    }

    log.info("当前线程:{},生成订单编号:{}",Thread.currentThread().getName() , orderCode);
    //其他逻辑

}

@Test
public void testDisLock() {
    //并发数
    int currency = 20;

    //循环屏障
    CyclicBarrier cyclicBarrier = new CyclicBarrier(currency);

    for (int i = 0; i < currency; i  ) {
        new Thread(() -> {
            // OrderServiceImplWithDisLock orderService = new OrderServiceImplWithDisLock();
            System.out.println(Thread.currentThread().getName()   "====start====");
            //等待一起出发
            try {
                //CyclicBarrier共享锁模拟并发
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            orderService.createOrder();
        }).start();

    }
}

运行test方法,zookeeper上面会生成锁节点。

线程抢到锁,会显示创建锁节点成功

并且执行锁里面的代码

抢不到锁,会阻塞,等待被释放的锁

释放锁时,节点删除,wacther机制通知

抢到锁的线程,执行当前线程里面的任务

github代码:https://github.com/lvshen9/demo/blob/lvshen-dev/src/main/java/com/lvshen/demo/distributelock/ZkDistributeLock.java

0 人点赞