ZooKeeper入门(四):ZooKeeper事务与分布式锁InterProcessMutax

2022-11-22 13:40:17 浏览数 (1)

前言

在上一篇文章使用ZooKeeper进行节点的CRUD操作并添加监视器实践中介绍了如何示意CuratorFramwork这个ZooKeeper客户端框架操作ZooKeeper节点。本文我们来学习使用CuratorFramwork完成在一个事务中进行多个操作, 并学习ZooKeeper的分布式事务锁。

ZooKeeper事务操作

我们知道Mysql和Oracle等关系型数据库都有事务的概念,事务具有ACID(原子性、一致性、隔离性和持久性)等特性,数据可以持久化到磁盘。而ZooKeeper和Redis类似,都是一种内存型数据库。Redis的事务可以通过pipline管道来实现,而ZooKeeper的事务则是通过TransactionOp这个类来实现的, 主要有以下四种事务操作

代码语言:javascript复制
    /**
     * 在一个事务中开启一个 创建构建器
     *
     * @return builder object
     */
    TransactionCreateBuilder<CuratorOp> create();

    /**
     * 在一个事务中开启一个删除构建器
     *
     * @return builder object
     */
    TransactionDeleteBuilder<CuratorOp> delete();

    /**
     * 在一个事务中开启一个 重置数据构建器
     *
     * @return builder object
     */
    TransactionSetDataBuilder<CuratorOp> setData();

    /**
     * 在一个事务中开启一个检查构建器
     *
     * @return builder object
     */
    TransactionCheckBuilder<CuratorOp> check();

本文在我的blogserver项目的基础之上完成

  1. ZooKeeperService中新建一个executeTransanction, 返回执行结果列表
代码语言:javascript复制
 /**
     * 在事务中执行多个操作
     * @return
     * @throws Exception
     */
    public List<CuratorTransactionResult> executeTranSanction() throws Exception {
        TransactionOp transactionOp = curatorFramework.transactionOp();
        CuratorOp createOp =  transactionOp.create().forPath("/test/op1", "createData".getBytes(StandardCharsets.UTF_8));
        CuratorOp setDataOp = transactionOp.setData().forPath("/test/op2", "setData".getBytes(StandardCharsets.UTF_8));
        CuratorOp deleteOp =  transactionOp.delete().forPath("/test/op3");
        List<CuratorTransactionResult> transactionResults = curatorFramework.transaction().forOperations(createOp, setDataOp, deleteOp);
        for(CuratorTransactionResult transactionResult: transactionResults){
            logger.info(transactionResult.getForPath() "-" transactionResult.getType());
        }
        return transactionResults;

    }
  1. 登录ZooKeeper客户端,使用zkCli在test节点下添加op2和op3两个节点
代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 1] create /test/op2 setupData1
Created /test/op2
[zk: localhost:2181(CONNECTED) 2] create /test/op3 deleteData
Created /test/op3

3 ZooKeeperController类中添加executeTransaction端点方法

代码语言:javascript复制
    @PostMapping("/executeTransaction")
    public RespBean<List<CuratorTransactionResult>> executeTransaction() {
        RespBean<List<CuratorTransactionResult>> respBean = null;
        try {
            List<CuratorTransactionResult> transactionResults = zooKeeperService.executeTranSanction();
            respBean = RespBean.success(transactionResults);
        } catch (Exception e) {
           logger.info("execute tranSanction failed", e);
           respBean = RespBean.error(e.getMessage());
        }
        return respBean;
    }

  1. postman测试
代码语言:javascript复制
POST http://localhost:8081/blog/zookeeper/executeTransaction

返回结果:

{
    "status": 200,
    "msg": "success",
    "data": [
        {
            "type": "CREATE",
            "forPath": "/test/op1",
            "resultPath": "/test/op1",
            "resultStat": null,
            "error": 0
        },
        {
            "type": "SET_DATA",
            "forPath": "/test/op2",
            "resultPath": null,
            "resultStat": {
                "czxid": 4294967543,
                "mzxid": 4294967546,
                "ctime": 1662969109253,
                "mtime": 1662970217859,
                "version": 1,
                "cversion": 0,
                "aversion": 0,
                "ephemeralOwner": 0,
                "dataLength": 7,
                "numChildren": 0,
                "pzxid": 4294967543
            },
            "error": 0
        },
        {
            "type": "DELETE",
            "forPath": "/test/op3",
            "resultPath": null,
            "resultStat": null,
            "error": 0
        }
    ]
}

返回列表中每个CuratorTransactionResult对象的error字段都为0, 说明事务操作成功; type字段为单个操作的类型;forPath字段为znode节点的路径;resultStat字段为znode节点的Stat状态属性。

ZooKeeper分布式锁

Zookeeper中的分布式锁主要是通过InterProcessMutex这个类来实现的,该类实现了InterProcessLockRevocable<InterProcessMutex>两个接口

InterProcessMutex是一把跨JVM的可重入互斥锁,用于ZooKeeper持有锁;不同服务器节点中的JVM间的所有进程只要使用相同的锁路径,就将实现进程间临界段。这种互斥锁是公平的,每一个用户都能按请求获取锁的先后顺序拿到这把互斥锁。

进程间互斥锁InterProcessMutex简介

InterProcessMutex类具有internalsbasePaththreadData三个成员变量

代码语言:javascript复制
// 实现ZooKeeper分布式的底层锁
private final LockInternals internals;
// 基础路径
private final String basePath;
// 线程-锁数据映射集合
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

internals变量才是ZooKeepeer在加锁时用到的底层锁,它的原理是加锁时根据提供的路径创建一个临时有序节点,并监听该临时有序节点父路径下的所有子节点,如果创建的临时有序节点是父路径下子节点中序号最小的节点,则加锁成功;否则继续监听序号比自己小的节点,直到创建的节点是父路径下序号最小的节点为止才拿到了分布式锁。任务完成释放锁的时候会删除持有锁临时有序节点以及该节点的监听器,只是删除监听器在删除znode节点之前。

LockDataInterProcessMutex类的私有静态类, 源码如下:

代码语言:javascript复制
private  static  class LockData {
    final Thread owningThread; // 持有锁的线程
    final String lockPath;  // 锁路径
    final AtomicInteger lockCount = new AtomicInteger(1); // 锁计数器,原子变量

    private LockData(Thread owningThread, String lockPath) // 构造方法
    {
        this.owningThread = owningThread;
        this.lockPath = lockPath;
    }
}

InterProcessMutex类获取锁与释放锁的方法

  • void acquire(): 以阻塞的方式获取分布式锁,获取失败抛出IO异常
  • boolean acquire(long time, TimeUnit unit): 在指定的时间内获取分布式锁,获取锁成功返回true, 获取锁失败则返回false
  • void release():释放分布式锁

注意:获取锁和释放锁必须是成对出现的, 每一次调用acquire方法必定对应一次release方法

InterProcessMutex 锁的用法

学习ZooKeepeer分布式锁,我们也要从它的使用开始着手

CuratorFramwork里,进程间互斥锁InterProcessMutex需要被进一步封装

curator-examples项目中的locking包下提供了一个进一步封装的分布式锁ExampleClientThatLocks

它有三个成员变量

  • lock: InterProcessMutex 类型进程间互斥锁
  • resource: FakeLimitedResource 类型共享资源
  • clientNameString类型客户端名称

构造方法

代码语言:javascript复制
public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName)
    {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessMutex(client, lockPath);
    }

分布式并发环境下执行业务逻辑

代码语言:javascript复制
public void  doWork(long time, TimeUnit unit) throws Exception
    {
        if ( !lock.acquire(time, unit) ) // 获取分布式锁
        {
            throw new IllegalStateException(clientName   " could not acquire the lock");
        }
        try
        {
            System.out.println(clientName   " has the lock"); 
            resource.use(); // 对共享资源进行原子操作,可以在该方法中执行需要进行分布式并发控制的业务
            
        }
        finally
        {
            System.out.println(clientName   " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

FakeLimitedResource类的源码如下:

代码语言:javascript复制
public  class FakeLimitedResource
{
    // 原子类型布尔值变量 inUse
    private  final AtomicBoolean inUse = new AtomicBoolean(false);
    
    // 用于模拟每次只有一个进程能访问共享资源
    public void     use() throws InterruptedException
    {
        // 原子比较更新操作,inUse的旧值为false则更新为true就能继续执行后面的业务逻辑,否则抛出非法状态异常
        if ( !inUse.compareAndSet(false, true) )
        {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }

        try
        {
            // 可以在这里对需要控制分布式并发的业务进行处理,如秒杀业务(需要控制超卖)、火车票抢购等业务
            Thread.sleep((long)(3 * Math.random()));
        }
        finally
        {
            inUse.set(false); // 重置InUse变量的值为false
        }
    }
}

封装后的分布式锁使用示例

curator-example项目的locking包下的LockingExample类演示了如何在分布式高并发场景下使用封装进程间互斥锁InterProcessMutex后的分布式锁ExampleClientThatLocks, 使用示例如下:

代码语言:javascript复制
public  class LockingExample
{
    private  static  final  int        QTY = 5;
    private  static  final  int        REPETITIONS = QTY * 10;

    private  static  final String     PATH = "/examples/locks";

    public static void main(String[] args) throws Exception
    {
      
        // 模拟每次只能有一个进程访问公共资源
        final FakeLimitedResource   resource = new FakeLimitedResource();
        // 创建线程池
        ExecutorService             service = Executors.newFixedThreadPool(QTY);
        // 创建测试服务器
        final TestingServer         server = new TestingServer();
        try
        {   // 模拟有QTY个ZooKeeper客户端
            for ( int i = 0; i < QTY;   i )
            {
                final int       index = i;
                Callable<Void>  task = () -> {
                    CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                    try
                    {
                        client.start();

                        ExampleClientThatLocks      example = new ExampleClientThatLocks(client, PATH, resource, "Client "   index);
                        // 重复执行任务次数
                        for ( int j = 0; j < REPETITIONS;   j )
                        {
                            example.doWork(10, TimeUnit.SECONDS);
                        }
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();
                    }
                    catch ( Exception e )
                    {
                        e.printStackTrace();
                        // log or do something
                    }
                    finally
                    {
                        // 任务执行完毕关闭CuratorFramework客户端
                        CloseableUtils.closeQuietly(client);
                    }
                    return null;
                };
                // 线程池提交任务
                service.submit(task);
            }
            // 关闭线程池
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }
        finally
        {   // 关闭测试服务器
            CloseableUtils.closeQuietly(server);
        }
    }
}

我们将以上的执行分布式并发任务稍作修改,将ZooKeeper客户端的连接url改为自己原创服务器上的ZooKeeper连接地址,客户端改为3个,重复执行任务0次。修改后的代码如下:

代码语言:javascript复制
public class LockingExample{
 
    private  static  final String     PATH = "/examples/locks";

    private  static  final List<String> connectUrls = Arrays.asList("localhost:2181", "localhost:2182", "localhost:2183");


    public static void main(String[] args) throws Exception
    {
        // all of the useful sample code is in ExampleClientThatLocks.java

        // FakeLimitedResource simulates some external resource that can only be access by one process at a time
        final FakeLimitedResource   resource = new FakeLimitedResource();

        ExecutorService             service = Executors.newFixedThreadPool(QTY);
        final TestingServer         server = new TestingServer();
        try
        {
            for ( int i = 0; i < 3;   i )
            {
                final int  index = i;
                Callable<Void>  task = () -> {
                    CuratorFramework        client = CuratorFrameworkFactory.newClient(connectUrls.get(index), new ExponentialBackoffRetry(1000, 3));
                    try
                    {
                        client.start();
                        ExampleClientThatLocks      example = new ExampleClientThatLocks(client, PATH, resource, "Client "   index);
                        example.doWork(10, TimeUnit.SECONDS);
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();
                    }
                    catch ( Exception e )
                    {
                        e.printStackTrace();
                        // log or do something
                    }
                    finally
                    {
                        CloseableUtils.closeQuietly(client);
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }
        finally
        {
            CloseableUtils.closeQuietly(server);
        }
    }
}

然后执行main方法,我们可以看到控制台打印出客户端拿到锁和释放锁以及没拿到锁抛出异常的日志信息:

代码语言:javascript复制
Client 1 has the lock
Client 1 releasing the lock
java.lang.IllegalStateException: Client 0 could not acquire the lock
	at locking.ExampleClientThatLocks.doWork(ExampleClientThatLocks.java:42)
	at locking.LockingExample.lambda$main$0(LockingExample.java:66)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalStateException: Client 2 could not acquire the lock
	at locking.ExampleClientThatLocks.doWork(ExampleClientThatLocks.java:42)
	at locking.LockingExample.lambda$main$0(LockingExample.java:66)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

同时我们在端点调试的过程中可以通过登录ZooKeeper客户端查看到ZNode节点创建与删除的过程

代码语言:javascript复制
[zk: localhost:2181(CONNECTED) 3] ls /examples/locks
[_c_303637df-b357-4ded-b944-b4765fa6489e-lock-0000000003]
[zk: localhost:2181(CONNECTED) 4] ls /examples/locks
[]
[zk: localhost:2181(CONNECTED) 5] ls /examples/locks
Node does not exist: /examples/locks
[zk: localhost:2181(CONNECTED) 6] ls /examples/locks
[_c_25780f4c-14be-49c0-856a-0289c241ec03-lock-0000000000]
[zk: localhost:2181(CONNECTED) 7] ls /examples/locks
[]
[zk: localhost:2181(CONNECTED) 8] ls /examples/locks
Node does not exist: /examples/locks

加锁时创建的znode节点是保护模式下的临时有序节点,所以节点名称的前缀是一串很长的字母组成的字符串,后缀是一个10位的数字组成的字符串,如:_c_303637df-b357-4ded-b944-b4765fa6489e-lock-0000000003, lock的前缀和后缀分别位为c_303637df-b357-4ded-b944-b4765fa6489e和0000000003。

InterProcessMutex锁底层源码分析

构造方法

代码语言:javascript复制
/**
     * @param client client
     * @param path   the path to lock
     */
    public InterProcessMutex(CuratorFramework client, String path)
    {
        this(client, path, new StandardLockInternalsDriver());
    }

    /**
     * @param client client
     * @param path   the path to lock
     * @param driver lock driver
     */
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
    {
        this(client, path, LOCK_NAME, 1, driver);
    }


    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
    {
        //验证节点路径是否合法
        basePath = PathUtils.validatePath(path); 
        // 实例化internals参数
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }

IntelProcessMutex类有三个重载构造方法,默认使用的构造方法为两个参数的构造方法, 实例化时传递的第一个参数为CuratorFramework客户端参数client和要锁住的节点路径参数path。两个参数的构造方法会调用三个参数的构造方法,默认使用的driver为StandardLockInternalsDriver类型的实例,最后三个参数的构造方法会调用protected访问权限的5个参数的构造方法,默认的lockName参数为"lock-", 默认的maxLeases参数为1。

在真正的5个参数的构造方法中实例化IntelProcessMutex类时会先检验节点的路径是否有效,并将校验后返回的路径作为basePath的值;然后再实例化internals参数, 它是一个LockInternals类型的参数,可以称作内部锁,正是借助于它才能实现对znode节点的创建、添加监听器以及加锁和释放锁等操作的。

加锁的流程

IntelProcessMutex类中获取锁的方法acquire(long time, TimeUnit unit)方法内部调用了私有方法internalLock(long time, TimeUnit unit)方法,那么就我们就仔细看一看这个方法的源码

代码语言:javascript复制
private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */
        // 获取当前线程
        Thread currentThread = Thread.currentThread();
        // 根据当前线程从CurrentHashMap数据类型threadData中获取分布式锁数据lockData
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // 若属于当前线程的lockData已经存在,则重入,锁数量 1
            lockData.lockCount.incrementAndGet();
            // 返回加锁成功
            return true;
        }
        // 属于当前线程的lockData不存在,则调用LockInternals#attemptLock方法获取锁
        // 真正获取锁的邻逻辑也就在这个方法里面
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {   // 获取锁成功则将当前线程与加锁节点路径组装成lockData并放入threadData中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            // 获取锁成功返回true
            return true;
        }
        // 获取锁失败最终返回false
        return false;
    }

加锁的流程大致分以下4步:

  • 1 从threadData中获取当前线程对应的加锁数据lockData, 判断是否为空: 不为空,可重入加锁,返回加锁成功;
  • 2 lockData为空,则调用LockInternals#attemptLock方法尝试创建加锁节点;
  • 3 判断上一步加锁返回的临时有序节点名称是否是父节所有子节点中序号最小的子节点,若是则返回加锁成功
  • 4 第3步中加锁返回的临时有序节点不是父节点的所有子节点中序号最小的子节点, 监视序号比加锁节点小且最靠近的子节点,待监视的子节点释放锁后回到第3步
  • 5 第4步中发生加锁超时或发生异常都将结束加锁流程,返回加锁失败

我们进一步来阅读LockInternals#attemptLock方法的源码

代码语言:javascript复制
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        // 记录加锁开始时间
        final long      startMillis = System.currentTimeMillis();
        // 根据时间单位转换加锁等待超时时间
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        // 节点数据
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        // 重试次数
        int             retryCount = 0;
        // 加锁成功节点路径
        String          ourPath = null;
        // 是否持有锁标识
        boolean         hasTheLock = false;
        // 是否完成加锁业务标识
        boolean         isDone = false;
        while ( !isDone ) // 开启轮训加锁
        {
            isDone = true;

            try
            {   // 调用LockInternalsDriver#createsTheLock方法创建临时有序节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 通过判断加锁节点的序号是否加锁路径下子节点中序号最小的节点判断是否加锁获取锁成功
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // 发生异常后重试直到超时
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount  , System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    // 连接成功后重新走加锁逻辑
                    isDone = false;
                }
                else
                {   // 重连客户端失败,抛出异常
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath; // 拿到锁直接返回加锁成功的节点路径
        }

        return null;
    }

然后我们来看加锁的两行关键代码

代码语言:javascript复制
// 创建加锁节点
 ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
 // 判断是否拿到锁                
 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

StandardLockInternalsDriver#createsTheLock 方法源码

代码语言:javascript复制
@Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

通过阅源码我们发现创建锁的过程中,其实是委托了ZooKeeper客户端,在开启保护模式下创建了临时有序节点,并在创建临时有序节点之前,如果父节点不存在则先创建父节点。

LockInternals#internalLockLoop方法源码:

代码语言:javascript复制
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false; // 是否持有锁标识
        boolean     doDelete = false;  // 是否删除临时有序节点标识
        try
        {
            if ( revocable.get() != null )
            {
                // 对加锁节点添加监视器
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )           // ZooKeeper客户端处于启动状态切当前线程还没有持有锁的情况下进行轮询
            {
                // 获取子节点集合并按序列号从小到大排好序
                List<String>        children = getSortedChildren();
                // 截取子节点名称,带序列号
                String              sequenceNodeName = ourPath.substring(basePath.length()   1); //  1 to include the slash
                // 判断节点序号是否子节点中序号最小的节点
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    // 是子节点中序号最小的节点,持有锁标识变为true
                    haveTheLock = true;
                }
                else
                {
                   // 创建的节点不是子节点中序号最小的节点,监听待加锁节点的前一个节点
                   String  previousSequencePath = basePath   "/"   predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                             client.getData().usingWatcher(watcher).forPath(previousSequencePath); // 给待加锁节点的前一个节点添加监视器,直到前一个节点释放锁后被删除
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    // 加锁超时,则删除加锁节点标识变为true
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
								// 等待被唤醒
                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {   // 检查线程是否被中断
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {   // 删除加锁的节点
                deleteOurPath(ourPath);
            }
        }
        // 返回是否持有锁标识
        return haveTheLock;
    }

StandardLockInternalsDriver#getsTheLock方法源码

代码语言:javascript复制
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        // 在子节点中获取待加锁节点的下标
        int             ourIndex = children.indexOf(sequenceNodeName);
        // 验证待解锁节点的下标是否有效
        validateOurIndex(sequenceNodeName, ourIndex);
        // 比较待加锁节点的下标是否小于maxLeases, 也就是1,小于1则表示获取锁成功
        boolean         getsTheLock = ourIndex < maxLeases;
        // 获取锁失败则返回待加锁节点的前一个节点的路径用于添加监视器
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        // 返回由pathToWatch和getsTheLock两个成员变量组成的PredicateResults
        return new PredicateResults(pathToWatch, getsTheLock);
    }

到这里我们可以画一个加锁的流程了

释放锁流程

释放锁方法InterProcessMutex#release的源码如下:

代码语言:javascript复制
@Override
public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */
        // 获取当前线程对应的加锁数据
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
           // 当前线程的加锁数据为空,抛IllegalMonitorStateException异常
           throw new IllegalMonitorStateException("You do not own the lock: "   basePath);
        }
        // 当前线程的加锁数量原子性减1
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {   // 当前线程的加锁数量原子性减1后值仍大于0,直接返回
            return;
        }
        if ( newLockCount < 0 ) // 当前线程的加锁数量原子性减1后的值小于0,IllegalMonitorStateException异常
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: "   basePath);
        }
        //当前线程的加锁数量原子性减1后值等于0
        try
        {
            // 调用LockInternals#releaseLock方法删除加锁数据中保存的节点路径
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {   // ZooKeeper删除加锁节点后,threadData变量移除当前线程对应的加锁数据
            threadData.remove(currentThread);
        }
    }

释放锁的逻辑就比加锁的逻辑简单多了,释放锁的流程大致如下:

  • 1 从threadData中取出当前线程的加锁数据lockData, 判断是否为空,为空直接抛IllegalMonitorStateException异常, 释放锁失败
  • 2 当前线程的加锁数据lockData不为空,当前线程的lockData变量的lockCount变量原子性减1,原子性减1后lockCount的值仍然大于0, 直接返回;原子性减1后lockCount的值小于0则抛出IllegalMonitorStateException异常,释放锁失败
  • 3 调用LockInternals#releaseLock方法删除加锁节点
  • 4 threadData移除当前线程对应的加锁数据

由于释放锁的流程比较简单,这里笔者就没画流程图了

小结

本文主要讲解了ZooKeeper中的事务操作以及ZooKeeper的分布式事务锁InterProcessMutex 的使用,并根据InterProcessMutex类中加锁和释放锁的源码分析加锁和解锁的流程和原理。

可以看到,ZooKeeper的分布式事务锁InterProcessMutex类加锁底层逻辑还是比较复杂的,不过好在InterProcessMutex类帮我们封装了大部分的底层逻辑,让我们使用起来也是非常简单的。

只是它不如redis分布式事务锁高效。因为在InterProcessMutex类中当前线程获取分布式事务锁时,需要判断加锁的临时有序节点是否是父节点的所有子节点中序号最小的子节点。如果不是的话,还要去监视前一个序号较小的节点,等它释放锁,直到加锁节点是父节点的所有子节点中序号最小的节点才能加锁成功。

这样ZooKeeper分布式事务锁的加锁效率自然就要打个很大的折扣,这也是为什么在大部分Java分布式项目中,开发人员大都会选择使用redis的分布式事务锁,而很少选择使用ZooKeeper的分布式事务锁的原因。

即使如此,对应ZooKeeper事务锁的底层的加锁原理我们还是很值得我们学习和探索一番的。

参考连接

curator-examples项目源码地址: https://github.com/apache/curator/tree/master/curator-examples

---END---

0 人点赞