AQS源码解析(二)
概述
1.AQS共享模式的使用
2.AQS共享模式下加锁源码分析
3.AQS共享模式下解锁源码分析
第1节 AQS共享模式的使用
信号量Semaphore其实就是通过AQS共享模式实现的共享锁。Semaphore通常用于并发控制。Semaphore使用方式如下。
代码语言:javascript复制public class SemaphoreDemo {
/**
* 限定线程数量
*/
private static final Semaphore SEMAPHORE = new Semaphore(3);
/**
* 每次获取的许可数
*/
private static final int PERMITS = 1;
/**
* 线程
*/
static class TestThread implements Runnable {
@Override
public void run() {
try {
// 获取许可
SEMAPHORE.acquire(PERMITS);
System.out.println(printCurrent()
" : " Thread.currentThread().getName() " 进来了");
// 持有许可,不会释放许可
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// !!!!非常重要!!!!
SEMAPHORE.release(PERMITS);
}
}
/**
* 打印时间
*
* @return 当前时间字符串
*/
private String printCurrent() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return sdf.format(new Date());
}
}
public static void main(String[] args) {
// 注意:本例只做演示使用。不要显示创建线程池,详情参考:Alibaba Java Coding Guidelines
// 不要显式创建线程,请使用线程池。
// 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
// 说明使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。
// 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
Thread t1 = new Thread(new TestThread(), "TestThread1");
Thread t2 = new Thread(new TestThread(), "TestThread2");
Thread t3 = new Thread(new TestThread(), "TestThread3");
Thread t4 = new Thread(new TestThread(), "TestThread4");
t1.start();
t2.start();
t3.start();
t4.start();
}
}
执行以上代码,执行结果如下。
代码语言:javascript复制2019-10-29 11:12:49 : TestThread1 进来了
2019-10-29 11:12:49 : TestThread2 进来了
2019-10-29 11:12:49 : TestThread3 进来了
2019-10-29 11:12:52 : TestThread4 进来了
第2节 AQS共享模式下加锁源码分析
Semaphore.acquire()方法用于获取共享锁,acquire()方法代码如下。
代码语言:javascript复制public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquire()方法调用的其实是AQS的acquireSharedInterruptibly()方法。acquireSharedInterruptibly()方法代码如下。
代码语言:javascript复制public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
acquireSharedInterruptibly()方法首先校验线程是否被中断。如果线程被中断则抛出异常。如果没有中断则通过tryAcquireShared()方法尝试获取共享锁。tryAcquireShared()方法代码如下。
代码语言:javascript复制protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
由此可见,AQS的tryAcquireShared()方法未实现任何逻辑。如果tryAcquireShared()方法返回小于0,说明获取共享锁失败。获取共享锁失败后将会执行doAcquireSharedInterruptibly()方法。doAcquireSharedInterruptibly()方法如下。
代码语言:javascript复制private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly()方法以共享模式将线程装入阻塞队列并挂起线程。
第3节 AQS共享模式下解锁源码分析
Semaphore.release()方法用于释放共享锁,release()方法代码如下。
代码语言:javascript复制public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
release()方法调用AQS的releaseShared()方法释放共享锁。releaseShared()方法代码如下。
代码语言:javascript复制public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared()方法通过tryReleaseShared()方法释放共享锁。tryReleaseShared()方法代码如下。
代码语言:javascript复制protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
由此可见,AQS的tryReleaseShared()方法未实现任何逻辑。如果tryReleaseShared()方法返回true,说明释放共享锁成功。释放共享锁成功后将会执行doReleaseShared()方法。doReleaseShared()方法如下。
代码语言:javascript复制private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
doReleaseShared()方法唤醒阻塞队列中的节点,使得线程恢复执行,争取共享锁。