并发编程(四)-AQS图解源码解析

2021-10-14 16:53:38 浏览数 (2)

AQS能干什么

抢占资源的线程直接执行处理业务,但是没有抢到的资源的进入就如排队等待机制,抢占失败的资源继续等待,但是等待线程仍然能保持获取锁的可能.

其中排序等待队列,如果共享资源占用,就需要一定的阻塞等待唤醒机制来保证锁分配,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现,他将请求线程封装成一个Node节点,通过CAS,自旋,LockSupport.park的方式维护state变量的状态.

源码解析AQS

我们拿ReentratLock当做入口,ReentratLock有分为公平锁和非公平锁,默认是非公平锁,看看其中其中lock底层的源码

其中公平锁和非公平锁获取锁的唯一区别就是红色部分,公平锁多了一个限制条件,hasQueuedPredecessots(),这个是公平锁加锁时候判断等待队列是否存放有效的节点方法

代码语言:javascript复制
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

我们按照非公平锁进行讲解AQS流程

初始阶段,队列为空,state=0表示没有人占用资源

线程A处理业务,此时state更新为1,表示持有锁

代码语言:javascript复制
final void lock() {
   //线程A进入这个分支,设置state=1,
    if (compareAndSetState(0, 1))
        //然后设置处理的线程为线程A
         setExclusiveOwnerThread(Thread.currentThread());
   else
        acquire(1); 
}

线程B开始执行,发现此时state=1,所以就会进入acquire,发现有人在持有锁,且不是自己

代码语言:javascript复制
 //线程B进入acquire
 public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
 }

//线程B尝试获取锁
protected final boolean tryAcquire(int acquires) {
     return nonfairTryAcquire(acquires);
}
//线程B尝试获取锁具体逻辑
final boolean nonfairTryAcquire(int acquires) {
    //此时的线程是线程B
    final Thread current = Thread.currentThread();
    //此时的state=1
    int c = getState();
   // 不满足不进入
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
   //判断处理业务的线程是否是线程B
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c   acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
//线程B最后返回false
    return false;
}

线程B在进入addWaiter方法,先建立默认的哨兵节点,头节点和尾节点都指向哨兵节点

代码语言:javascript复制
//线程B进入这里
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
   //此时的pred节点为null,不符合
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//线程B进入这里
    enq(node);
    return node;
}

private Node enq(final Node node) {
//自旋循环处理
    for (;;) {
        Node t = tail;
       //节点是空,进入这里
        if (t == null) { // Must initialize
            //这里建立哨兵节点,设置为头节点
             if (compareAndSetHead(new Node()))
               //尾节点也指向指向头节点也就是哨兵节点 
               tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

然后在第二次循环处理,线程B节点前节点指哨兵节点,队列的尾节点指向线程B节点,哨兵节点下一个节点为线程B节点

代码语言:javascript复制
private Node enq(final Node node) {
//自旋循环处理
    for (;;) {
        Node t = tail;
       //节点是空,进入这里
        if (t == null) { // Must initialize
            //这里建立哨兵节点,设置为头节点
             if (compareAndSetHead(new Node()))
               //尾节点也指向指向头节点也就是哨兵节点 
               tail = head;
        } else {//第二次循环进入这里
            //线程B节点指向哨兵节点
            node.prev = t;
           //队列的尾节点指向线程B节点
            if (compareAndSetTail(t, node)) {
               //哨兵节点下一个节点指向线程B节点
                t.next = node;
                return t;
            }
        }
    }
}

此时线程C,和线程B一样不能抢到资源,进入addWaiter,线程C的前节点指向线程B,然后队列尾节点指向线程C节点,线程B节点下一个节点指向线程C节点

代码语言:javascript复制
//线程C进入这里
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
   //此时这个tail节点就是上一个线程B节点
    Node pred = tail;
   //此时pred不为null,即线程B节点
    if (pred != null) {
        //线程C节点的前一个节点指向pred即线程B节点
        node.prev = pred;
       //队列尾节点指向线程C
        if (compareAndSetTail(pred, node)) {
            //线程B节点下一个节点指向线程C
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

当线程B和线程C进入队列中后,然后进入acquireQueued,比如此时是线程B进入这里,这里第一次循环处理的时候,只会把前驱节点waitStatus设置成-1

代码语言:javascript复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
       //循环处理线程B
        for (;;) {
            //这里获取线程B的上一个节点即哨兵节点
            final Node p = node.predecessor();
            //哨兵节点即头结点,此时线程B还想获取锁资源,但是此时资源还是被线程A占有
           //所以此时不符合
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
         // 第一次循环进入这里 
         //获取锁失败之后的线程进入这里
         //线程B进入这里判断哨兵节点的waitstatus,第一次进入更新为Node.Signal=-1
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

然后进入第二次处理,此时重点看shouldParkAfterFailedAcquire,和parkAndCheckInterrupt,最终线程都会阻塞,记住这里后面我们还会回来

代码语言:javascript复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
       //循环处理线程B
        for (;;) {
            //这里获取线程B的上一个节点即哨兵节点
            final Node p = node.predecessor();
            //哨兵节点即头结点,此时线程B还想获取锁资源,但是此时资源还是被线程A占有
           //所以此时不符合
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
         // 第二次循环进入这里 
         //获取锁失败之后的线程进入这里
         //线程B进入这里判断哨兵节点的waitstatus,然后返回true
            if (shouldParkAfterFailedAcquire(p, node) &&
               //然后第二次线程B进入这里
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//
private final boolean parkAndCheckInterrupt() {
        线程B进入阻塞,后面的其他的线程也都会进入这里进行阻塞
        LockSupport.park(this);
        return Thread.interrupted();
}

此时线程A已经执行完成了,开始释放锁了,调用release,开始释放资源,设置state=0,持有线程为null

代码语言:javascript复制
//线程A释放锁
public final boolean release(int arg) {
    //尝试释放锁,这里释放成功,返回true
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
   //计算int c=0-1=0
    int c = getState() - releases;
   //此时当前线程是A,持有资源也是线程A 
   if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
 //此时c=0.符合进入
    if (c == 0) {
        free = true;
        //设置资源为的线程为null
        setExclusiveOwnerThread(null);
    }
  //设置资源的状态为0
    setState(c);
    return free;
}

release后面继续执行,然后唤醒正在阻塞的线程B,恢复哨兵节点的状态为0

代码语言:javascript复制
//线程A释放锁
public final boolean release(int arg) {
    //尝试释放锁,这里释放成功,返回true
    if (tryRelease(arg)) {
        //这里返回哨兵节点
        Node h = head;
       //哨兵节点不为null,且waitStatus=-1,符合进入
        if (h != null && h.waitStatus != 0)
          //唤醒线程B,同时设置哨兵waitStatus=0
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
   //哨兵节点是waitStatus=-1
    int ws = node.waitStatus;
    if (ws < 0)
       //进入这里 哨兵节点又会变成waitStatus=0
        compareAndSetWaitStatus(node, ws, 0);

   // 这里获取到下一个节点就是线程B
    Node s = node.next;
  // 此时线程B的状态是-1不符合跳过
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
  // 进入这里,唤醒线程B,
    if (s != null)
        LockSupport.unpark(s.thread);
}

然后继续看看我们前面遗留的问题,就是此时还在阻塞的线程B和C,我们看看线程B,继续执行之前的循环逻辑,让线程B出队列持有资源,队列中的线程C也是同理最终获取到锁

代码语言:javascript复制
final boolean acquireQueued(final Node node, int arg) 
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//线程B唤醒,继续循环
           //线程B获取哨兵节点
            final Node p = node.predecessor();
           //p就是头节点,线程B再次尝试获取锁,因为此时资源是无人占用,所以此时线程B获取到了锁
            if (p == head && tryAcquire(arg)) {
               // 头结点指向线程B节点,同时是指thread=null,prev=null
                setHead(node);
             //释放之前的哨兵节点,设置为null
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) 
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

0 人点赞