深入浅出CountDownLatch
CountDownLatch的基本使用
基本概念和案例
CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到 CountDownLatch 是倒数的意思,类似于我们倒计时的概念。
代码语言:javascript复制public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
// 倒计时 类似 Thread.join() 的作用
new Thread(countDownLatch::countDown).start(); // 3 - 1
new Thread(countDownLatch::countDown).start(); // 2 - 1
new Thread(countDownLatch::countDown).start(); // 1 - 1
// = 0 后才会唤醒
countDownLatch.await(); // 阻塞主线程
System.out.println("线程执行完毕");
}
}
执行流程图
代码语言:javascript复制TypeError: Cannot read properties of undefined (reading 'v')
并发场景模拟
代码语言:javascript复制public class CountDownLatchDemo implements Runnable {
static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) {
for (int i = 0; i < 1000; i ) {
// 模拟 1000 个线程的并发
new Thread(new CountDownLatchDemo()).start();
}
countDownLatch.countDown(); // = 0 同时唤醒所有处于阻塞状态下的线程
}
@Override
public void run() {
try {
countDownLatch.await(); // 让每个线程先处于阻塞状态
// TODO
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 异步连接操作
}
通过 countDownLatch.await()
可以实现阻塞请求操作,等到服务器返回结果后再使用 countDownLatch.countDown()
唤醒线程。
CountDownLatch源码分析
类关系图
构造方法
代码语言:javascript复制// Sync 内部类 继承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // state 计数器存在 AQS 中
}
}
// new COuntDownLatch(3) -> state 存储为 3
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
await()方法
代码语言:javascript复制// CountDownLatch.java
private static final class Sync extends AbstractQueuedSynchronizer {
// 实现 tryAcquireShared 尝试获取 state
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
public void await() throws InterruptedException {
// 调用 AQS 的方法
sync.acquireSharedInterruptibly(1);
}
//////////////////////////////////////////////////////
// AbstractQueuedSynchronized.java
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 判断线程是否中断,state 是否 < 0,aquire 操作是否成功
if (Thread.interrupted() || (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0))
// 否则抛出线程中断异常
throw new InterruptedException();
}
acquireSharedInterruptibly()
方法中和 AQS 核心方法一样构造双向链表来存储线程的节点通过自旋的操作实现线程的不断排队、阻塞和唤醒。
countDown()方法
代码语言:javascript复制// jdk 17 和 jdk 8 的方法不一样
// 修改 state = state - 1 通过 CAS 设置到 state 这个字段上
public void countDown() {
sync.releaseShared(1);
}
//////////////////////////////////////////////////////
// AbstractQueuedSynchronized.java
abstract static class Node {
final int getAndUnsetStatus(int v) { // for signalling
// 颚化符对 i 按位取反
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}
}
public final boolean releaseShared(int arg) {
// 通过自旋得到 state 并尝试通过 CAS 修改 state
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
//
s.getAndUnsetStatus(WAITING);
// 释放线程取消阻塞状态
LockSupport.unpark(s.waiter);
}
}
通过共享锁不断传递、唤醒下一个节点,直到头节点和尾节点相同则说明已经没有需要唤醒的线程了,完成所有线程的唤醒。