AQS
学习ReentrantLock就不得不知道AQS,因为ReentrantLock就是基于了AQS对象的
特点
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
ReentrantLock
相对于 synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与 synchronized 一样,都支持可重入
可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待 ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒
原理
非公平锁实现原理
加锁流程
先看构造器,默认为非公平锁
代码语言:javascript复制 public ReentrantLock() {
sync = new NonfairSync();
}
没有竞争时
第一个竞争出现时
Thread-1 执行了
- CAS 尝试将 state 由 0 改为 1,结果失败
- 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
- 接下来进入 addWaiter 逻辑,构造 Node 队列
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态 Node 的创建是懒惰的 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置 exclusiveOwnerThread 为 Thread-1,state = 1 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
代码语言:javascript复制 static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁实现
final void lock() {
// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果失败
acquire(1);
}
代码语言:javascript复制 public final void acquire(int arg) {
if (!tryAcquire(arg) && //进入tryAcquire
//当tryAcquire返回false时候,先调用addWaiter,再调用acquireQueued
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire中的逻辑
代码语言:javascript复制 protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
//这里直接cas体现了非公平不去检查AQS
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果已经获得锁并且还是自己那么发送了锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取失败
return false;
}
addWaiter逻辑
代码语言:javascript复制 private Node addWaiter(Node mode) {
//将当前线程关联到一个Node对象,模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果tail不为空cas尝试将Node对象加入AQS队列尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队列为空
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//还没有设置head 为哨兵节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//cas将Node加入AQS尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued中的流程
代码语言:javascript复制 final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//判断这个节点的上一个节点
final Node p = node.predecessor();
//上一个节点是head,去尝试一次
if (p == head && tryAcquire(arg)) {
//获取成功设置自己为head
setHead(node);
p.next = null; // help GC
failed = false;
//返回中断标记
return interrupted;
}
//尝试失败 / 上一个节点不是head
if (shouldParkAfterFailedAcquire(p, node) && // 判断是否需要等待
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//上一个节点的状态
int ws = pred.waitStatus;
//上一个节点在阻塞
if (ws == Node.SIGNAL)
return true;
//表示取消状态
if (ws > 0) {
//重构前面所有取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 需要设置上一个节点的值为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
注意:是否需要unpark是由当前节点的前驱节点的waitStatus == Node.SIGNAL来决定,而不是本节点的waitStatus
解锁源码
代码语言:javascript复制 public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
//队头节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
代码语言:javascript复制 protected final boolean tryRelease(int releases) {
//state减去释放的
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果state为01则设置owner为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
代码语言:javascript复制private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 不考虑已取消的节点
if (s == null || s.waitStatus > 0) {
s = null;
//从队尾先前找需要unpark的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//启动线程
LockSupport.unpark(s.thread);
}
可重入原理
代码语言:javascript复制 final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 原理
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可打断原理
代码语言:javascript复制 private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 直接抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
公平锁实现原理
代码语言:javascript复制protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//判断是否有前驱节点没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
条件变量实现原理
每个条件变量对应一个等待队列,其实先类是ConditionObject
await
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
signal 流程
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
addConditionWaiter
代码语言:javascript复制private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//从队列中删除
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个节点放入
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
doSignal
代码语言:javascript复制 //将没取消的第一个节点转移到AQS队列
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && //将队列中的Node转移到AQS队列,不成功就继续循环
(first = firstWaiter) != null);
}
代码语言:javascript复制final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//加入AQS队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}