说明
每个 Java 工程师都应该或多或少地了解 AQS,我已经反复研究了很长时间,忘记了一遍又一遍地看它.每次我都有不同的经历.这一次,我打算重新拿出系统的源代码,并将其总结成一系列文章,以供将来查看.
一般来说,AQS规范是很难理解的,本次准备分五篇文章用来分析AQS框架:
- 第一篇(翻译AQS论文,理解AQS实现思路)
- 第二篇(介绍AQS基础属性,内部类,抽象方法)
- 第三篇(介绍独占模式的代码实现)
- 第四篇(介绍共享模式的代码实现)
- 第五篇(介绍Condition的相关代码实现)
介绍
本篇文章为系列文章的第二篇,本篇文章先介绍一下AQS内部属性字段和内部类等,让我们更方便看懂代码
什么是AQS
通过第一篇文章我们已经知道,AbstractQueuedSynchronizer 是其他同步器的基础类,它被用来实现阻塞锁和依赖先进先出(FIFO)等待队列的相关同步器(例如,互斥锁(ReentrantLock)、读写锁(ReentrantLock)、信号量(Semaphore)、屏障(CyclicBarrier)、Futures等),它的内部依赖于单个原子int值来表示同步状态。
子类必须定义更改此状态的受保护方法,以及定义此状态在获取或释放此对象方面的含义。这个类中的其他方法执行所有排队和阻塞机制。子类可以维护其他状态字段,但是只有使用getState()、setState(int)和compareAndSetState(int,int)方法操作的原子更新的int值被跟踪到同步。
成员变量
代码语言:javascript复制 // 等待队列的头
private transient volatile Node head;
// 等待队列的尾
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 自旋锁阈值
static final long spinForTimeoutThreshold = 1000L;
- head:等待队列的头部,它是延迟初始化的,除了在初始化是设置,它只能通过方法setHead修改,还需要注意:如果head存在,它的waitStatus 值保证不为CANCELLED
- tail:等待队列的尾部,它是延迟初始化的,除了在第一次初始化Node节点时,会有尝试设置外,它仅能通过方法enq修改,以添加新的等待节点()
- state: 同步状态,不同同步器方法,标识含义不同
- spinForTimeoutThreshold: 如果等待时间小于这个值,将不会使该线程进行超时等待,而是进入快速的自旋过程,原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,反而会让nanosTimeout的超时从整体上表现得不精确。因此,为了提高性能,在超时非常短的场景下,同步器会进入无条件的快速自旋。
内部类 Node
代码语言:javascript复制static final class Node {
// 用于标识节点正在共享模式下等待的标记
static final Node SHARED = new Node();
// 用于标识节点正在独占模式下等待的标记
static final Node EXCLUSIVE = null;
// waitStatus值,标识线程已取消
static final int CANCELLED = 1;
// waitStatus值,用于标识后续线程需要唤醒(unparking)
static final int SIGNAL = -1;
// waitStatus值,标识线程正在等待条件
static final int CONDITION = -2;
// waitStatus值,表示下一个acquireShared(共享同步状态获取)应该无条件传播
static final int PROPAGATE = -3;
// 当前节点状态
volatile int waitStatus;
// 前节点
volatile Node prev;
// 后节点
volatile Node next;
// 当前节点线程
volatile Thread thread;
// 节点类型/
Node nextWaiter;
// 是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 只能用于 addWaiter
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 只能用于Condition
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
通过Node 节点字段SHARED与EXCLUSIVE,节点有两种模式
- 独占模式:每次只能有一个线程能持有资源;
- 共享模式:允许多个线程同时持有资源;
- waitStatus: SIGNAL字段,仅接受以下值:
- SIGNAL(-1): 此节点的后续节点已(或即将)被阻塞(通过park),因此当前节点在释放或取消时必须取消其后续节点的阻塞(unpark)。 为了避免竞争,acquire方法必须首先标识它们需要一个信号,然后重试原子acquire,然后在失败时阻塞 CANCELLED(1): 此节点因超时或中断而取消. 节点永远不会离开此状态.特别是,节点已取消的线程永远不会再阻塞。 CONDITION(-2): 此节点当前处于条件队列中. 它在被传输之前不会被用作同步队列节点,此时状态将设置为0。 (此处使用此值与字段的其他用法无关,仅用来简化机制) PROPAGATE(-3): 应将releaseShared传播到其他节点,这是在doReleaseShared中设置的(仅针对头节点),以确保传播继续进行, 即使此后有其他操作介入也是如此 0: 默认值
- waitStatus以数字形式排列以简化使用,非负值表示节点不需要发出信号,所以,大多数代码不需要检查特定的值,只需要检查符号 对于正常同步节点,该字段初始化为0;对于条件节点,该字段初始化为CONDITION.它是使用CAS修改的,(或者在可能的情况下,使用无条件的volatile写入)
- nextWaiter: 等待队列的后继节点,或者标识共享模式/独占模式。因为条件队列仅在保持独占模式时才被访问,我们只需要一个简单的链接队列来保存等待条件的节点,然后将它 们转移到队列中重新获取,并且由于等待条件只能是独占的,所以我们当节点是共享模式时我们使用这个值标识共享模式。
需要子类实现的方法
代码语言:javascript复制// 尝试以独占模式获取。此方法应查询对象的状态是否允许在独占模式下获取它,如果允许,则获取它。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
// 尝试设置状态以反映独占模式下的释放
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
// 尝试在共享模式下获取。此方法应查询对象的状态是否允许在共享模式下获取该对象,如果允许,则获取该对象。
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
// 尝试设置状态以反映共享模式下的释放。
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}
//如果以独占方式针对当前(调用)线程保持同步,则返回TRUE。此方法在每次调用非等待AbstractQueuedSynchronizer.ConditionObject方法时调用。(等待方法改为调用Release。)
protected boolean isHeldExclusively() { throw new UnsupportedOperationException();}
native 方法
代码语言:javascript复制 private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizerImitate.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizerImitate.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizerImitate.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
* CAS 设置 头结点字段 仅供enq使用。
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
* CAS 设置 尾结点字段 仅供enq使用。
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
* CAS 设置 waitStatus字段。
*/
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
/**
* CAS next field of a node.
* CAS 设置 节点的下一个字段。
*/
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}