JUC学习笔记——并发工具JUC
在本系列内容中我们会对JUC做一个系统的学习,本片将会介绍JUC的核心内容
我们会分为以下几部分进行介绍:
- AQS 原理
- ReentrantLock 原理
- 读写锁
- Semaphore
- CountdownLatch
- CyclicBarrier
- 线程安全集合类概述
AQS原理
我们将在这一节简单介绍一下AQS原理
AQS概述
我们首先来介绍AQS:
- 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
我们的AQS需要采用继承方法才能够实现(默认抛出 UnsupportedOperationException):
- tryAcquire (尝试获取锁)
- tryRelease (尝试解开锁)
- tryAcquireShared (尝试获得共享锁)
- tryReleaseShared (尝试解开共享锁)
- isHeldExclusively(判断是否是当前线程,用于实现可重入锁)
我们这里给出AQS的一些特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
我们给出AQS的两个基本操作:
代码语言:javascript复制/*获得锁*/
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}
/*释放锁*/
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
自定义同步器
我们采用AQS来实现一个简单的锁机制:
代码语言:javascript复制// 我们使用AQS就需要继承AbstractQueuedSynchronizer并实现对应方法
final class MySync extends AbstractQueuedSynchronizer {
// 尝试获得锁
@Override
protected boolean tryAcquire(int acquires) {
// 这里的acquire类似于一个令牌,就相比于owner之前的争夺区域概念
if (acquires == 1){
// 我们尝试获得锁
if (compareAndSetState(0, 1)) {
// 获得成功,修改状态,并将该线程加入该锁的owner
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
// 获取失败
return false;
}
// 尝试解开锁
@Override
protected boolean tryRelease(int acquires) {
// 如果当前线程拥有权限
if(acquires == 1) {
// 判断状态
if(getState() == 0) {
// 如果为0说明当前无锁,抛出异常(不是正常状态,正常状态应该是当前锁被该线程占用,由该线程解锁)
throw new IllegalMonitorStateException();
}
// 将owner设为空,并修改state
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 失败
return false;
}
// 创建新条件变量存放await线程
protected Condition newCondition() {
return new ConditionObject();
}
// 判断是否是同个线程
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
自定义锁
我们根据同步器进行修改完成一个自定义锁:
代码语言:javascript复制/*测试代码*/
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
sleep(1);
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t2").start();
/*自定义锁*/
class MyLock implements Lock {
// 这就是我们的自定义同步器,根据AQS修改出来的
static MySync sync = new MySync();
@Override
// 尝试,不成功,进入等待队列
public void lock() {
sync.acquire(1);
}
@Override
// 尝试,不成功,进入等待队列,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
// 尝试一次,不成功返回,不进入队列
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
// 尝试,不成功,进入等待队列,有时限
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
// 释放锁
public void unlock() {
sync.release(1);
}
@Override
// 生成条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
/*测试结果*/
22:29:28.727 c.TestAqs [t1] - locking...
22:29:29.732 c.TestAqs [t1] - unlocking...
22:29:29.732 c.TestAqs [t2] - locking...
22:29:29.732 c.TestAqs [t2] - unlocking...
/*注意我们上述采用的isHeldExclusively默认state为1,也就说我们采用的是不可重入锁,我们如果两次lock就会导致死锁*/
// 同一线程两次上锁
lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...");
// 会导致只输出一次locking
locking...
ReentrantLock 原理
我们将在这一节简单介绍一下ReentrantLock原理部分
原理图
首先我们给出ReentrantLock的继承图:
我们可以发现ReentrantLock是由Sync发展过来的,和AQS有一定异曲同工之处
非公平锁实现原理
我们首先来介绍一下非公平锁的实现原理
加锁解锁流程
我们首先给出流程图:
- 先从构造器开始看,默认为非公平锁实现
// NonfairSync 继承自 AQS
public ReentrantLock() {
sync = new NonfairSync();
}
- 没有竞争时
- 第一个竞争出现时
- Thread-1 执行了
- CAS 尝试将 state 由 0 改为 1,结果失败
- 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
- 接下来进入 addWaiter 逻辑,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
- Node 的创建是懒惰的
- 当前线程进入 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- 继续acquireQueued 逻辑
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
- 再次有多个线程经历上述过程竞争失败,变成这个样子
- Thread-0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
- 当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
- 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
- 回到 Thread-1 的 acquireQueued 流程
- 如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收
- 如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
- 如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
- 如果不巧又被 Thread-4 占了先
加锁源码
代码语言:javascript复制// Sync 继承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁实现
final void lock() {
// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 ㈠
acquire(1);
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// ㈡ 进入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
// ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
//将当前node加入等待队列末尾等待,并返回当前node
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
//非公平同步器中有head和tail两个引用分别指向了等待队列的第一个和最后一个节点
//pred指的是node的前驱,从队尾插入,所以pred为tail
Node pred = tail;
// 如果 tail 不为 null, 说明已经有了等待队列了,cas 尝试将 Node 对象加入 AQS 队列尾部
if (pred != null) {
//将node的前驱节点设置为pred
node.prev = pred;
//尝试将队列的tial从当前的pred修改为node
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
//如果pred为null,说明等待队列还未创建,调用enq方法创建队列
// 尝试将 Node 加入 AQS, 进入 ㈥
enq(node);
return node;
}
// ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
//该方法就是创建等待队列,并将node插入队列的尾部。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node())) {
//将head赋值给tail,head和tail同时指向哨兵节点
tail = head;
}
} else {
// cas 尝试将 Node 对象加入 AQS 队列尾部
//设置node的前驱节点为队列的最后一个节点
node.prev = t;
//尝试将队列的尾部从当前的tail设置为node
if (compareAndSetTail(t, node)) {
//将node设为上一个tail的后继节点
t.next = node;
return t;
}
}
}
}
// ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
//在队列中循环等待,只有当排队排到第一名并且获得了锁才能出队并从方法中退出。
//返回打断状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//找到当前node的前驱节点
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
// 上一个节点 help GC
p.next = null;
failed = false;
// 返回中断标记 false
return interrupted;
}
if (
// 判断是否应当 park, 进入 ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
//判断acquire失败以后是否应该阻塞等待。从规则上来讲:
//1.如果前驱节点都阻塞了,那么当前节点也应该阻塞
//2.如果前驱节点取消,那么应该将前驱节点前移,直到其状态不为取消为止。
//3.如果前两种情况都不是,尝试将前驱节点状态设为SIGNAL,返回false(不用阻塞,等到下次在阻塞)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 上一个节点都在阻塞, 那么自己也阻塞好了
return true;
}
// > 0 表示取消状态
if (ws > 0) {
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
注意 是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定
总结:
- 调用
lock
,尝试将state从0修改为1- 成功:将owner设为当前线程
- 失败:调用
acquire
->tryAcquire
->nonfairTryAcquire
,判断state=0则获得锁,或者state不为0但当前线程持有锁则重入锁,以上两种情况tryAcquire
返回true,剩余情况返回false。- true:获得锁
- false:调用
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
,其中addwiter
将关联线程的节点插入AQS队列尾部,进入acquireQueued
中的for循环:- 如果当前节点是头节点,并尝试获得锁成功,将当前节点设为头节点,清除此节点信息,返回打断标记。
- 调用
shoudParkAfterFailure
,第一次调用返回false,并将前驱节点改为-1,第二次循环如果再进入此方法,会进入阻塞并检查打断的方法。
解锁源码
代码语言:javascript复制// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 解锁实现
public void unlock() {
sync.release(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
总结:
unlock
->syn.release
(1)->tryRelease
(1),如果当前线程并不持有锁,抛异常。state减去1,如果之后state为0,解锁成功,返回true;如果仍大于0,表示解锁不完全,当前线程依旧持有锁,返回false。- 返回true:检查AQS队列第一个节点状态图是否为
SIGNAL
(意味着有责任唤醒其后记节点),如果有,调用unparkSuccessor
。- 再
unparkSuccessor
中,不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点,如果有,将其唤醒。
- 再
- 返回false:
可重入原理
我们下面介绍一下可重入原理:
代码语言:javascript复制// 当持有锁的线程再次尝试获取锁时,会将state的值加1,state表示锁的重入量。
/*代码展示*/
static final class NonfairSync extends Sync {
// ...
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
// 先把当前线程固定下来设为变量
final Thread current = Thread.currentThread();
// 判断状态(正常的抢断owner)
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// 释放时首先将state--,用于表示去除了一层锁
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
可打断原理
我们分别给出不可打断和可打断的源码进行分析:
代码语言:javascript复制/*不可打断源码*/
// 在此模式下,即使它被打断,仍会驻留在 AQS 队列中,并将打断信号存储在一个interrupt变量中。
// 一直要等到获得锁后方能得知自己被打断了,并且调用`selfInterrupt`方法打断自己。
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}
//响应打断标记,打断自己
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
}
/*可打断源码*/
// 此模式下即使线程在等待队列中等待,一旦被打断,就会立刻抛出打断异常。
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 这里一直在判断是否被打断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入这里
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
公平锁实现原理
公平锁是如何实现的:
- 简而言之,公平与非公平的区别在于,公平锁中的tryAcquire方法被重写了
- 新来的线程即便得知了锁的state为0,也要先判断等待队列中是否还有线程等待,只有当队列没有线程等待式,才获得锁。
我们给出公平锁的源码:
代码语言:javascript复制static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 上锁
final void lock() {
acquire(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里就是主要区别!!!
// 公平锁:先检查 AQS 队列中是否有前驱节点, 没有才去竞争
// 非公平锁:不会检查,直接竞争,可能就会导致当前线程和AQS队列中的线程进行竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null ||
// 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
}
条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject
await 流程
我们首先给出await流程:
- 开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
- 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
- 接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
- unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
- park 阻塞 Thread-0
总结:
- 创建一个节点,关联当前线程,并插入到当前Condition队列的尾部
- 调用
fullRelease
,完全释放同步器中的锁,并记录当前线程的锁重入数 - 唤醒(park)AQS队列中的第一个线程
- 调用park方法,阻塞当前线程。
signal 流程
我们给出signal流程:
- 假设 Thread-1 要来唤醒 Thread-0
- 进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
- 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
- Thread-1 释放锁,进入 unlock 流程,略
总结:
- 当前持有锁的线程唤醒等待队列中的线程,调用doSignal或doSignalAll方法,将等待队列中的第一个(或全部)节点插入到AQS队列中的尾部。
- 将插入的节点的状态从Condition设置为0,将插入节点的前一个节点的状态设置为-1(意味着要承担唤醒后一个节点的责任)
- 当前线程释放锁,parkAQS队列中的第一个节点线程。
源码展示
最后我们给出上述流程所使用的源码:
代码语言:javascript复制public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 第一个等待节点
private transient Node firstWaiter;
// 最后一个等待节点
private transient Node lastWaiter;
public ConditionObject() { }
// ㈠ 添加一个 Node 至等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个关联当前线程的新 Node, 添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 唤醒 - 将没取消的第一个节点转移至 AQS 队列
private void doSignal(Node first) {
do {
// 已经是尾节点了
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (
// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null
);
}
// 外部类方法, 方便阅读, 放在此处
// ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
final boolean transferForSignal(Node node) {
// 如果状态已经不是 Node.CONDITION, 说明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入 AQS 队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 上一个节点被取消
ws > 0 ||
// 上一个节点不能设置状态为 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 让线程重新同步状态
LockSupport.unpark(node.thread);
}
return true;
}
// 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打断等待 - 直到被唤醒
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁, 见 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打断等待 - 直到被唤醒
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁, 见 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 外部类方法, 方便阅读, 放在此处
// ㈣ 因为某线程可能重入,需要将 state 全部释放
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 打断模式 - 在退出等待时重新设置打断状态
private static final int REINTERRUPT = 1;
// 打断模式 - 在退出等待时抛出异常
private static final int THROW_IE = -1;
// 判断打断模式
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// ㈤ 应用打断模式
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 等待 - 直到被唤醒或打断
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//向Condition中的等待队列中新增节点,并将此节点返回
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//判断当前节点是否在同步器中的队列中等待锁
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
// 等待 - 直到被唤醒或打断或超时
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
// 获得最后期限
final long deadline = System.nanoTime() nanosTimeout;
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// 已超时, 退出等待队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean awaitUntil(Date deadline) throws InterruptedException {
// ...
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// ...
}
// 工具方法 省略 ...
}
读写锁
我们将在这一节简单介绍一下读写锁
ReentrantReadWriteLock
ReentrantLock为我们提供了专用于读和写的锁:
- ReentrantReadWriteLock
ReentrantReadWriteLock具有以下特点:
- 首先需要创建,本体会带有两个锁ReadLock和WriteLock,分别采用方法获取
- 该锁支持读读同步,读写互斥,写写互斥操作
我们下面进行简单的测试:
代码语言:javascript复制/*读写方法*/
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
/*读读并发*/
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
/*读写互斥*/
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
Thread.sleep(100);
new Thread(() -> {
dataContainer.write();
}, "t2").start();
/*写写互斥*/
// 这里不再测试了~
下面我们再给出部分注意点:
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();
try {
// ...
w.lock();
try {
// ...
} finally{
w.unlock();
}
} finally{
r.unlock();
}
- 重入时降级支持:即持有写锁的情况下去获取读锁
class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
缓存应用
我们在多次使用数据库时,如果多次采用同一个数据库搜索语句可能会导致数据库拥塞
所以我们可以选择将调用的数据库语句以及结果全部都缓存下来,但当遇到更新时我们需要撤销缓存防止读取之前的数据导致错误
我们首先需要思考,当多线程时,我们应该先清除缓存还是先更新数据:
- 先更新数据库
我们分别给出两种展示图:
- 先清缓存
- 先更新数据库
- 补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
但是我们为了保险起见,我们采用ReentrantReadWriteLock锁来进行处理:
代码语言:javascript复制class GenericCachedDao<T> {
// HashMap 作为缓存非线程安全, 需要保护
HashMap<SqlPair, T> map = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
GenericDao genericDao = new GenericDao();
// 更新操作,写锁
public int update(String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
int rows = genericDao.update(sql, params);
map.clear();
return rows;
} finally {
lock.writeLock().unlock();
}
}
// 查找操作,读锁,若缓存查找失败,去数据库读取并将数据放入缓存,写锁
public T queryOne(Class<T> beanClass, String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加读锁, 防止其它线程对缓存更改
lock.readLock().lock();
try {
T value = map.get(key);
if (value != null) {
return value;
}
} finally {
lock.readLock().unlock();
}
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
// 为防止重复查询数据库, 再次验证
T value = map.get(key);
if (value == null) {
// 如果没有, 查询数据库
value = genericDao.queryOne(beanClass, sql, params);
map.put(key, value);
}
return value;
} finally {
lock.writeLock().unlock();
}
}
// 作为 key 保证其是不可变的
class SqlPair {
private String sql;
private Object[] params;
public SqlPair(String sql, Object[] params) {
this.sql = sql;
this.params = params;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return sql.equals(sqlPair.sql) &&
Arrays.equals(params, sqlPair.params);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result Arrays.hashCode(params);
return result;
}
}
}
读写锁原理
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
图片解释
首先我们给出四个简单的例子并给出图片解释
t1 w.lock,t2 r.lock
我们给出展示图:
- t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处
- 不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位
- t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。
- 如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
- tryAcquireShared 返回值表示
- -1 表示失败
- 0 表示成功,但后继节点不会继续唤醒
- 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
- 这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,
- 不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
- t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
- 如果没有成功,在 doAcquireShared 内 for (;
- 如果没有成功,在 doAcquireShared 内 for (;