AQS源码解析(2)——共享模式
关联文章
AQS源码解析(1)
概述
1.AQS共享模式使用
2.AQS共享模式加锁
3.AQS共享模式解锁
第1节 AQS共享模式使用
在Java并发包下,Semaphore(信号量)工具类就是使用AQS共享模式的一种实现。Semaphore的使用方式如下。
代码语言:javascript复制/**
* @Author: zhouguanya20@163.com
* @Date: 2019-09-08 10:14
* @Description: 信号量——共享锁实现
*
* Semaphore还用于Hystrix限流框架中,控制系统并发在可控的范围内,保证系统高可用
*/
public class SemaphoreDemo {
/**
* 限定线程数量
*/
private static final Semaphore SEMAPHORE = new Semaphore(3);
/**
* 每次获取的许可数
*/
private static final int PERMITS = 1;
/**
* 线程
*/
static class TestThread implements Runnable {
@Override
public void run() {
try {
// 获取许可
SEMAPHORE.acquire(PERMITS);
System.out.println(printCurrent()
" : " Thread.currentThread().getName() " 进来了");
// 持有许可,不会释放许可
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// !!!!非常重要!!!!
SEMAPHORE.release(PERMITS);
}
}
/**
* 打印时间
*
* @return 当前时间字符串
*/
private String printCurrent() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
return sdf.format(new Date());
}
}
public static void main(String[] args) {
// 注意:本例只做演示使用。不要显示创建线程池,详情参考:Alibaba Java Coding Guidelines
// 不要显式创建线程,请使用线程池。
// 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
// 说明使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。
// 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
Thread t1 = new Thread(new TestThread(), "TestThread1");
Thread t2 = new Thread(new TestThread(), "TestThread2");
Thread t3 = new Thread(new TestThread(), "TestThread3");
Thread t4 = new Thread(new TestThread(), "TestThread4");
t1.start();
t2.start();
t3.start();
t4.start();
}
}
执行以上程序,执行结果如下。
代码语言:javascript复制2019-09-22 10:35:30 : TestThread1 进来了
2019-09-22 10:35:30 : TestThread3 进来了
2019-09-22 10:35:30 : TestThread2 进来了
2019-09-22 10:35:33 : TestThread4 进来了
第2节 AQS共享模式加锁
acquireShared()方法是共享模式下线程获取资源的顶层入口。获取成功则直接返回,失败则进入等待队列,并自旋直到获取资源为止。
代码语言:javascript复制public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared()方法的作用是尝试以共享模式进行获取。tryAcquireShared()方法是要由AQS的子类实现的。tryAcquireShared()方法返回值小于0表示获取共享资源失败。
代码语言:javascript复制protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
比如Semaphore对tryAcquireShared()方法的实现如下。
代码语言:javascript复制protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如果tryAcquireShared()方法返回小于0,则调用doAcquireShared()方法,以共享的不间断模式进行共享资源的获取。doAcquireShared()源码如下。
代码语言:javascript复制/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法和AQS独占模式中的acquireQueued()方法类似。参考AQS源码解析(1)
setHeadAndPropagate()方法是用于传播唤醒动作的——即共享。
代码语言:javascript复制/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
至此,AQS共享模式获取资源的acquireShared()方法分析完毕。
第3节 AQS共享模式解锁
releaseShared()释放共享锁,是共享模式释放锁的顶层入口。releaseShared()方法源码如下。
代码语言:javascript复制/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()方法留给子类重写。返回布尔值,true代表完全释放资源,可以走到分支中通知等待线程队列了。tryReleaseShared()方法如下。
代码语言:javascript复制/**
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared()方法在Semaphore中的实现如下。
代码语言:javascript复制protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
如果tryReleaseShared()方法返回true将执行doReleaseShared()方法。doReleaseShared()方法释放满足条件的后继结点。