AQS能干什么
抢占资源的线程直接执行处理业务,但是没有抢到的资源的进入就如排队等待机制,抢占失败的资源继续等待,但是等待线程仍然能保持获取锁的可能.
其中排序等待队列,如果共享资源占用,就需要一定的阻塞等待唤醒机制来保证锁分配,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现,他将请求线程封装成一个Node节点,通过CAS,自旋,LockSupport.park的方式维护state变量的状态.
源码解析AQS
我们拿ReentratLock当做入口,ReentratLock有分为公平锁和非公平锁,默认是非公平锁,看看其中其中lock底层的源码
其中公平锁和非公平锁获取锁的唯一区别就是红色部分,公平锁多了一个限制条件,hasQueuedPredecessots(),这个是公平锁加锁时候判断等待队列是否存放有效的节点方法
代码语言:javascript复制public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
我们按照非公平锁进行讲解AQS流程
初始阶段,队列为空,state=0表示没有人占用资源
线程A处理业务,此时state更新为1,表示持有锁
代码语言:javascript复制final void lock() {
//线程A进入这个分支,设置state=1,
if (compareAndSetState(0, 1))
//然后设置处理的线程为线程A
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
线程B开始执行,发现此时state=1,所以就会进入acquire,发现有人在持有锁,且不是自己
代码语言:javascript复制 //线程B进入acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//线程B尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//线程B尝试获取锁具体逻辑
final boolean nonfairTryAcquire(int acquires) {
//此时的线程是线程B
final Thread current = Thread.currentThread();
//此时的state=1
int c = getState();
// 不满足不进入
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断处理业务的线程是否是线程B
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//线程B最后返回false
return false;
}
线程B在进入addWaiter方法,先建立默认的哨兵节点,头节点和尾节点都指向哨兵节点
代码语言:javascript复制//线程B进入这里
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//此时的pred节点为null,不符合
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//线程B进入这里
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋循环处理
for (;;) {
Node t = tail;
//节点是空,进入这里
if (t == null) { // Must initialize
//这里建立哨兵节点,设置为头节点
if (compareAndSetHead(new Node()))
//尾节点也指向指向头节点也就是哨兵节点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
然后在第二次循环处理,线程B节点前节点指哨兵节点,队列的尾节点指向线程B节点,哨兵节点下一个节点为线程B节点
代码语言:javascript复制private Node enq(final Node node) {
//自旋循环处理
for (;;) {
Node t = tail;
//节点是空,进入这里
if (t == null) { // Must initialize
//这里建立哨兵节点,设置为头节点
if (compareAndSetHead(new Node()))
//尾节点也指向指向头节点也就是哨兵节点
tail = head;
} else {//第二次循环进入这里
//线程B节点指向哨兵节点
node.prev = t;
//队列的尾节点指向线程B节点
if (compareAndSetTail(t, node)) {
//哨兵节点下一个节点指向线程B节点
t.next = node;
return t;
}
}
}
}
此时线程C,和线程B一样不能抢到资源,进入addWaiter,线程C的前节点指向线程B,然后队列尾节点指向线程C节点,线程B节点下一个节点指向线程C节点
代码语言:javascript复制//线程C进入这里
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//此时这个tail节点就是上一个线程B节点
Node pred = tail;
//此时pred不为null,即线程B节点
if (pred != null) {
//线程C节点的前一个节点指向pred即线程B节点
node.prev = pred;
//队列尾节点指向线程C
if (compareAndSetTail(pred, node)) {
//线程B节点下一个节点指向线程C
pred.next = node;
return node;
}
}
enq(node);
return node;
}
当线程B和线程C进入队列中后,然后进入acquireQueued,比如此时是线程B进入这里,这里第一次循环处理的时候,只会把前驱节点waitStatus设置成-1
代码语言:javascript复制final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//循环处理线程B
for (;;) {
//这里获取线程B的上一个节点即哨兵节点
final Node p = node.predecessor();
//哨兵节点即头结点,此时线程B还想获取锁资源,但是此时资源还是被线程A占有
//所以此时不符合
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 第一次循环进入这里
//获取锁失败之后的线程进入这里
//线程B进入这里判断哨兵节点的waitstatus,第一次进入更新为Node.Signal=-1
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
然后进入第二次处理,此时重点看shouldParkAfterFailedAcquire,和parkAndCheckInterrupt,最终线程都会阻塞,记住这里后面我们还会回来
代码语言:javascript复制final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//循环处理线程B
for (;;) {
//这里获取线程B的上一个节点即哨兵节点
final Node p = node.predecessor();
//哨兵节点即头结点,此时线程B还想获取锁资源,但是此时资源还是被线程A占有
//所以此时不符合
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 第二次循环进入这里
//获取锁失败之后的线程进入这里
//线程B进入这里判断哨兵节点的waitstatus,然后返回true
if (shouldParkAfterFailedAcquire(p, node) &&
//然后第二次线程B进入这里
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//
private final boolean parkAndCheckInterrupt() {
线程B进入阻塞,后面的其他的线程也都会进入这里进行阻塞
LockSupport.park(this);
return Thread.interrupted();
}
此时线程A已经执行完成了,开始释放锁了,调用release,开始释放资源,设置state=0,持有线程为null
代码语言:javascript复制//线程A释放锁
public final boolean release(int arg) {
//尝试释放锁,这里释放成功,返回true
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//计算int c=0-1=0
int c = getState() - releases;
//此时当前线程是A,持有资源也是线程A
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//此时c=0.符合进入
if (c == 0) {
free = true;
//设置资源为的线程为null
setExclusiveOwnerThread(null);
}
//设置资源的状态为0
setState(c);
return free;
}
release后面继续执行,然后唤醒正在阻塞的线程B,恢复哨兵节点的状态为0
代码语言:javascript复制//线程A释放锁
public final boolean release(int arg) {
//尝试释放锁,这里释放成功,返回true
if (tryRelease(arg)) {
//这里返回哨兵节点
Node h = head;
//哨兵节点不为null,且waitStatus=-1,符合进入
if (h != null && h.waitStatus != 0)
//唤醒线程B,同时设置哨兵waitStatus=0
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//哨兵节点是waitStatus=-1
int ws = node.waitStatus;
if (ws < 0)
//进入这里 哨兵节点又会变成waitStatus=0
compareAndSetWaitStatus(node, ws, 0);
// 这里获取到下一个节点就是线程B
Node s = node.next;
// 此时线程B的状态是-1不符合跳过
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 进入这里,唤醒线程B,
if (s != null)
LockSupport.unpark(s.thread);
}
然后继续看看我们前面遗留的问题,就是此时还在阻塞的线程B和C,我们看看线程B,继续执行之前的循环逻辑,让线程B出队列持有资源,队列中的线程C也是同理最终获取到锁
代码语言:javascript复制final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//线程B唤醒,继续循环
//线程B获取哨兵节点
final Node p = node.predecessor();
//p就是头节点,线程B再次尝试获取锁,因为此时资源是无人占用,所以此时线程B获取到了锁
if (p == head && tryAcquire(arg)) {
// 头结点指向线程B节点,同时是指thread=null,prev=null
setHead(node);
//释放之前的哨兵节点,设置为null
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}