Java并发-JUC-AQS-独占模式源码解析

2021-12-06 15:51:43 浏览数 (2)

文章目录

代码语言:txt复制
    - ​
        - [说明](https://cloud.tencent.com/developer)
        - [疑问](https://cloud.tencent.com/developer)
        - ​
            - [为什么需要实现两种不同模式](https://cloud.tencent.com/developer)
            - [什么是独占模式](https://cloud.tencent.com/developer)
        - [概述](https://cloud.tencent.com/developer)
        - [源码分析](https://cloud.tencent.com/developer)
        - [总结](https://cloud.tencent.com/developer)
说明

每个 Java 工程师都应该或多或少地了解 AQS,我已经反复研究了很长时间,忘记了一遍又一遍地看它.每次我都有不同的经历.这一次,我打算重新拿出系统的源代码,并将其总结成一系列文章,以供将来查看.

一般来说,AQS规范是很难理解的,本次准备分五篇文章用来分析AQS框架:

  1. 第一篇(翻译AQS论文,理解AQS实现思路)
  2. 第二篇(介绍AQS基础属性,内部类,抽象方法)
  3. 第三篇(介绍独占模式的代码实现)
  4. 第四篇(介绍共享模式的代码实现)
  5. 第五篇(介绍Condition的相关代码实现)
疑问
为什么需要实现两种不同模式

大师给的解释是,虽然大多数应用程序应最大程度地提高总吞吐量,最大程度地容忍缺乏饥饿的概率。但是,在诸如资源控制之类的应用程序中,保持跨线程访问的公平性,容忍较差的聚合吞吐量更为重要,没有任何框架能够代表用户在这些相互冲突的目标之间做出决定;相反,必须适应不同的公平政策。所以AQS框架提供了两种模式

什么是独占模式

独占模式:每次只能有一个线程能持有资源;

概述

本篇文章为系列文章的第三篇,本篇文章介绍AQS独占模式的代码实现,首先,我们从总体过程入手,了解AQS排他性锁的执行逻辑,然后逐步深入分析了源代码。

获取锁的过程:

  1. acquire ()申请锁资源时,如果成功,它将进入临界区
  2. 当获取锁失败时,它进入一个 FIFO 等待队列并被阻塞,等待唤醒
  3. 当队列中的等待线程被唤醒时,会再次尝试获取锁资源。如果成功,它进入临界区,否则它将继续阻塞,等待唤醒 释放锁过程:
  4. 当线程调用release()来释放锁资源时,如果没有其他线程在等待锁资源,那么释放就完成了。
  5. 如果队列中有其他正在等待锁资源的线程需要被唤醒,则队列中的第一个等待节点(FIFO)将被唤醒。
源码分析

基于上面提到的获取和释放排他锁的一般过程,让我们来看看源代码实现逻辑.首先,让我们看看获取锁的acquire()方法。

代码语言:javascript复制
 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSICE),arg)){
            selfInterrupt();
 }

虽然代码很短,但是缺包含很多内容,让我们一步一步来看:

  1. 第一步执行的是开发人员自己实现的tryAcquire()方法来尝试获取锁资源,如果成功则执行整个acquire()方法,既当前线程获得锁资源并进入临界值.
  2. 如果获取锁失败,它进入以下逻辑,从addWaiter(Node.EXCLUSIVE)方法开始.看看这个方法的源代码实现:
代码语言:javascript复制
    /**
     * Creates and enqueues node for current thread and given mode.
     * 为当前线程和给定的模式创建并进入节点队列。
     * @param mode Node.EXCLUSIVE for exclusive,表示独占
     *             Node.SHARED for shared 表示共享
     * @return the new node
     *         返回新节点
     */
    private Node addWaiter(Node mode) {
        //根据当前线程创建新的节点
        //因为是独占模式,所以节点类型是Node.EXCLUSIVE
        Node node = new Node(Thread.currentThread(),mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 为了提高性能,我们首先执行一个快速的队列进入操作,也就是说,我们直接尝试在队列的末尾添加新节点
        // pred!=null表示已经有线程完成了节点初始化
        if (pred!=null) {
            node.prev = pred;
            //根据CAS的逻辑,即使并发操作也只能有一个线程能成功追加到尾节点并返回,
            //其余的线程将执行后续的排队操作.也就是enq()方法
            if (compareAndSetTail(pred,node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    /**
     * Inserts node into queue, initializing if necessary. See picture above
     * 将节点插入队列,必要时进行初始化。
     * @param node – the node to insert
     *             — 待插入的节点
     * @return node's predecessor
     *         前节点
     */
    private Node enq(final Node node){
        for(;;){
            Node t = tail;
            //tail 和 head 在这里初始化是因为AQS设计的是延迟初始化
            //tail 等于null,表示队列还没有初始化
            //如果队列还没有初始化,初始化,创建一个空的头节点
            if (t == null) {
                //类似地addWaiter,CAS只有一个线程可以成功初始化头节点,其余线程必须重复循环
                //初始化 tail head
                if (compareAndSetHead(new Node())){
                    tail = head;
                }
            } else {
                //新创建的前节点指向队列的末尾,在并发的情况下,毫无疑问会有多个新创建的节点的前节点会指向队列的末尾。
                node.prev = t;
                //根据这一步的CAS,无论前一步有多少新节点指向尾部节点,在这一步中只有一个能够真正成功加入设置,
                //其余的必须重新执行循环体
                if (compareAndSetTail(t, node)){
                   //将原tail节点的后节点设置为新tail节点
                   //由于CAS和设置next不是原子操作,因此可能出现更新tail节点成功
                   //但是未执行pred.next = node,导致无法从head遍历节点;
                   //但是由于前面已经设置了prev属性,因此可以从尾部遍历;
                   //像getSharedQueuedThreads、getExclusiveQueuedThreads都是从尾部开始遍历
                    t.next = node;
                    //循环的唯一退出操作是成功设置tail节点
                    return t;
                }
            }
        }
    }

以上入口操作要说明两点:首先,初始化队列的触发条件是线程当前占用锁资源,所以上面创建的空头节点可以看作是当前占用锁资源的节点(尽管它没有设置任何属性)。请注意,整个代码处于一个死循环中,直到设置成功.如果他们失败了,他们会一次又一次地尝试。

完成上述操作后,我们申请获取锁的线程成功加入了等待队列。通过本文开头提到的独占锁获取过程,节点现在需要做的是挂起当前线程并等待唤醒。这一逻辑如何实现?看看源代码:

代码语言:javascript复制
    //通过以上分析,node是刚刚进入包含当前线程信息的队列的节点
    final boolean acquireQueued(final Node node, int arg) {
        //获取锁定资源失败标记位
        boolean failed = true;
        try {
            //正在等待线程的中断标记位
            boolean interrupted = false;
            //该循环体的执行时序包括两种:新节点队列和等待节点唤醒的队列
            for (;;) {
                //获取当前节点的前节点
                final Node p = node.predecessor();
                //如果前端节点是头节点,尝试获取锁资源
                if (p == head && tryAcquire(arg)) {
                    //当前节点获取锁资源后,设置为Header节点
                    //Header节点表示当前占用锁资源的节点
                    setHead(node);
                    p.next = null; // help GC
                    //表示成功获取锁资源,因此failed被设为false
                    failed = false;
                    //返回一个中断标志,用来表示当前节点是被正常唤醒还是被中断唤醒
                    return interrupted;
                }
                //如果锁不成功,则进入挂起逻辑
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //最后,获取锁故障处理逻辑。
            if (failed)
                cancelAcquire(node);
        }
    }

挂起逻辑是一种非常重要的逻辑。我们单独分析一下。首先,我们应该注意到这样一个事实:到目前为止,我们只基于当前线程和节点类型创建了一个节点,并加入了队列。其他属性是默认值

代码语言:javascript复制
    //node是当前线程的节点,pred是它的pre-node参数。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前节点前节点的等待状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
             ///如果前节点的等待状态为Node.SIGNAL,它返回true,
             //然后执行parkAndCheckInterrupt()方法挂起它。
            return true;
        if (ws > 0) {
            //waitStatus中有几个值表示前节点被取消
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //这里我们从当前节点的前节点开始,循环查找没有被取消的节点(其实就是从队列删除取消状态的节点)
            ///注意,由于Header节点是由new Node()创建的,waitStatus为0,
            //所以这里不会出现空指针的问题,也就是说最多循环到Header节点退出。
            pred.next = node;
        } else {
            //根据waitStatus的取值限制,waitStatus只能为0或Propagate,
            //我们将前节点的waitStatus设置为Node.SIGNAL,重新进入判断方法
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;//返回false,表示不需要调用park
    }

上述方法的逻辑更为复杂。用于判断当前节点是否可以挂起,即是否满足唤醒条件,如果挂起,则必须由其他线程唤醒。如果该方法返回false,表示挂起条件未完成,则会重新执行acquireQueued方法的循环体,重新进行判断.如果返回tru,则意味着一切就绪,可以挂起,它将进入parkAndCheckInterrupt()方法,一起来看下源代码吧:

代码语言:javascript复制
    private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程
        LockSupport.park(this); 
        //唤醒后,返回中断标志,也就是说,如果唤醒是正常的,则返回false,如果唤醒是中断的,则返回true。
        return Thread.interrupted();
    }

查看acquireQueued方法中的源代码。如果中断被唤醒,将interrupt标志设置为true。无论是正常唤醒还是从中断中唤醒,您都尝试获取锁定资源。如果成功,则返回中断标志,否则将暂停等待。

注意:Thread.interrupted()方法在返回中断标志时清除中断标志,也就是说,当中断被唤醒并且锁成功时,整个quireQueued方法返回TRUE以指示中断被唤醒,但是如果中断被唤醒并且锁没有被获取,则它继续挂起,因为中断已经被清除,并且如果下一次是正常的。

唤醒,quireQueued方法返回false,表示没有中断。

最后,我们返回到acquireQueued方法的最后一步,即finally模块.这是在锁定资源获取失败之后完成的一些后续工作, 看上面的代码,实际上可以在这里输入的是tryAcquire()方法引发异常. 也就是说,如果AQS框架为开发人员自己实现的锁获取操作抛出异常,它也会进行适当的处理.让我们一起看一下源代码:

代码语言:javascript复制
    //传入方法参数为当前获取锁资源失败的节点
    private void cancelAcquire(Node node) {
        //如果节点不存在,直接忽略
        if (node == null)
            return;

        node.thread = null;

        //跳过所有取消的前置节点,类似上面的跳转逻辑
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //这是前面节点的后继节点.由于上面跳转节点的操作,可能不是这里的当前节点.仔细想想。
        Node predNext = pred.next;

        //将当前节点的waitStatus设置为CANCELLED,以便其他节点在处理时将跳过该节点
        node.waitStatus = Node.CANCELLED;

        //如果直接删除当前的尾部节点,即从队列中删除
        //注意:这里不需要关心CAS失败,因为即使并发失败,节点也已经成功删除。
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //这里的判断逻辑非常模糊,特别是当前节点的前端节点不是头节点,
                    //后面的节点等待唤醒(waitStatus小于0)
                    //另外,如果不取消当前节点的后继节点,则前一个节点与后一个节点连接,相当于删除当前节点。
                    compareAndSetNext(pred, predNext, next);
            } else {
                //在这里输入,或者当前节点的前节点是头节点,或者前面节点的waitStatus是PROPAGATE,它直接唤醒当前节点的后续节点。
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

上面分析的是不可中断获取锁的实现,但是AQS还提供了可中断,可定时的获取锁操作,我们也一起看下吧

代码语言:javascript复制
   //可中断获取锁
   public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //区别是 如果响应中断,则直接抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

有上面代码可以看出,可中断获取锁和不可中断的区别就是,是标记中断,还是直接抛出异常

下面在来看下可定时获取锁源码:

代码语言:javascript复制
   //尝试以独占模式获取,如果中断则中止,如果超过给定超时则失败。。
   public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
        private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L) {
            return false;
        }
        final long deadline = System.nanoTime()   nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSICE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return true;
                }
                //如果时间已经小于0,则直接返回false 放弃获取锁
                nanosTimeout = deadline -System.nanoTime();
                if (nanosTimeout <= 0L) {
                    return false;
                }
                //如果锁不成功,同时当前时间剩余时间大于spinForTimeoutThreshold,则阻塞nanosTimeout时间
                if (shouldParkAfterFailedAcquire(p,node) && nanosTimeout > spinForTimeoutThreshold) {
                    LockSupport.parkNanos(this,nanosTimeout);
                }

                if (Thread.interrupted()){
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }

以上是exclusive mode acquisition lock(独占模式获取锁)的核心源代码,这真的是非常难理解,这些方法需要重复很多次,才能慢慢理解。

接下来,看看释放锁的过程:

代码语言:javascript复制
 public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

TryRelease()方法是用户定义的释放锁逻辑。如果成功,则判断等待队列中是否有需要唤醒的节点(waitStatus 0表示没有需要唤醒的节点)。

一起看下唤醒操作的实现吧:

代码语言:javascript复制
  private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            //将标记设置为0表示唤醒操作已经启动,并提高了并发环境中的性能
            compareAndSetWaitStatus(node, ws, 0);

        //如果当前节点的后继节点为null,或者已经被取消
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //请注意,循环不会中断,也就是说,它会来回查找,直到找到最近的节点,等待从当前节点唤醒。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //进行唤醒操作
        if (s != null)
            LockSupport.unpark(s.thread);
    }

相比之下,释放锁要简单得多,代码也少得多。

总结

以上是AQS排他锁的获取和释放过程。一般的想法很简单:尝试获取锁,并在失败时加入队列挂起。当锁被释放时,如果队列中有等待的线程,锁就会被唤醒。但如果你一步一步跟踪源代码,你会发现有很多细节,很多地方很难理解,我已经学了一遍又一遍,但我不敢说,我研究了AQS,甚至不敢说,上述研究结果是正确的,只是写一篇总结,与同行交流经验。除了排他锁之外,稍后还将推出AQS系列文章,包括共享锁、条件队列的实现原理等等。

0 人点赞