多线程同步必学:CountDownLatch的核心原理与应用

2024-04-18 16:07:06 浏览数 (2)

前言

CountDownLatch 是 Android 平台中常用的线程同步工具类,它可以让一个或多个线程等待其他线程完成某个任务后再继续执行。它通过一个计数器来实现,计数器的初始值可以设置为一个正整数,每当一个线程完成任务后,计数器的值会递减 1。当计数器的值递减到 0 时,等待的线程才会被唤醒,继续执行后续的操作。

CountDownLatch 经常用于以下场景:

  • 等待多个子线程完成任务后再执行主线程任务
  • 确保资源在使用前被初始化完成
  • 实现线程间的同步和协调

CountDownLatch 原理

CountDownLatch 的实现原理基于 AQS(AbstractQueuedSynchronizer)同步器。AQS 是 Java 并发编程中常用的同步器框架,它提供了很多用于实现线程同步的机制,例如锁、信号量、屏障等。

CountDownLatch 内部维护了一个计数器变量 state,以及一个 AQS 队列。当 CountDownLatch 对象被创建时,state 变量会被初始化为指定的值。调用 countDown() 方法会使 state 变量的值递减 1。调用 await() 方法的线程会尝试获取 AQS 队列的锁,如果 state 变量的值为 0,则表示所有等待的线程都已经完成任务,AQS 队列会释放锁,唤醒所有等待的线程。如果 state 变量的值不为 0,则调用 await() 方法的线程会进入 AQS 队列等待。

下面,我们将深入其源码,分析其核心实现细节。

核心变量

CountDownLatch 使用了一个名为 Sync 的内部类来继承 AbstractQueuedSynchronizer(AQS),这是处理锁和同步器的一个框架。Sync 用一个单一的非负整数来表示状态,这个状态就是剩余需要等待的事件数量。

代码语言:javascript复制
private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }
}

在这段代码中,构造函数接受初始计数作为参数,并通过 setState() 方法设置 AQS 的状态值。

减少计数

countDown() 方法在调用时减少计数器的值。当计数器达到零时,释放所有等待的线程。

代码语言:javascript复制
public void countDown() {
    sync.releaseShared(1);
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

这里,releaseShared() 方法调用导致 tryReleaseShared() 被执行。tryReleaseShared() 尝试原子地减少状态值,并在计数到达零时返回 true,这会导致等待在 CountDownLatch 上的线程被唤醒。

等待计数到零

线程调用 await() 方法等待计数器达到零。这是通过 AQS 来实现阻塞和后续的唤醒。

代码语言:javascript复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

在这段代码中,await() 调用了 acquireSharedInterruptibly(),这会导致 tryAcquireShared() 的执行。如果 getState() 返回 0,表示没有更多的事件需要等待,方法返回 1,允许线程继续执行。如果不是 0,则返回 -1,表示线程应该被阻塞。

由于 CountDownLatch 的实现依赖于高效的 AQS 框架,其性能通常很高。但是,CountDownLatch 是一次性的,计数器达到零后不能被重置。如果需要一个可重置的版本,可以考虑使用 CyclicBarrierSemaphore

CountDownLatch 特别适用于一种情况:一个线程必须等待其他几个线程完成某些操作后,才能继续执行,例如主线程等待初始化线程加载完毕后继续执行。

这种机制在多线程编程中非常有用,尤其是在处理复杂的依赖关系和执行顺序时。

AQS的核心概念

AbstractQueuedSynchronizer(AQS)使用了一个内部的 FIFO 队列来管理所有等待获取资源的线程,并通过一个单一的整型变量来表示同步状态。

AQS 的核心在于管理同步状态(state)和管理线程之间的排队等待。它提供了两种模式的同步机制:

  1. 独占模式:这种模式下,每次只能有一个线程持有同步状态。典型的使用例子是 ReentrantLock
  2. 共享模式:在这种模式下,多个线程可同时获取同步状态。例如,SemaphoreCountDownLatch 就是采用的共享模式。

同步队列

AQS 内部使用一个名为 Node 的静态内部类来实现同步队列,每个节点(Node)可能代表一个正在等待获取资源的线程。节点会根据线程的等待状态被构造为不同的类型(独占、共享、条件等)。

代码语言:javascript复制
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled. */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking. */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition. */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate.
     */
    static final int PROPAGATE = -3;

    ...
}

同步状态

同步状态是通过一个整型的变量来管理的,具体的含义由实现 AQS 的同步器来定义。例如,在 ReentrantLock 中,它表示锁的持有计数;而在 Semaphore 中,它表示当前可用的许可数。

代码语言:javascript复制
/**
 * The synchronization state.
 */
private volatile int state;

获取同步状态

使用 CAS(Compare-And-Swap)操作来修改状态,这是一种无锁的同步机制,能有效地减少锁的开销。例如:

代码语言:javascript复制
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

节点的阻塞和唤醒

CountDownLatchawait() 方法可以在指定的时间内等待,直到计数器减到零。当使用带有超时功能的 await() 时,内部实际上调用的是 AQSdoAcquireSharedNanos 方法。这个方法是 AQS 提供的共享模式下的获取方式,允许线程以纳秒为单位等待直到获取成功,或者直到超时发生。

代码语言:javascript复制
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime()   nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
  1. 时间跟踪:方法开始时记录当前时间,并在每次循环中更新,以计算剩余的等待时间。这种方式确保即使中途发生一些延迟,也能准确计算超时时间。
  2. 节点加入队列:调用 addWaiter(Node.SHARED) 将当前线程包装成一个 Node(共享模式)并加入到等待队列中。
  3. 循环尝试获取资源:进入无限循环,每次循环检查当前节点的前驱节点是否是头节点(这意味着当前节点可能是队列中的第一个等待节点)。如果是,尝试通过 tryAcquireShared 方法获取资源。
  4. 成功获取:如果 tryAcquireShared 返回一个非负值,说明成功获取了共享资源。随后通过 setHeadAndPropagate 将当前节点设置为头节点,并向后传播(可能唤醒后续等待的节点),然后退出循环。
  5. 超时检查:每次循环检查剩余的等待时间,如果小于等于0,表示已经超时,退出循环并返回 false
  6. 线程挂起:如果当前线程的前驱节点不是头节点,或者尝试获取失败,那么线程将会被挂起一段时间(纳秒级)。使用 LockSupport.parkNanos 来挂起线程。
  7. 异常处理:如果在获取过程中发生异常或线程在获取资源前中断,将通过 cancelAcquire 方法取消该节点的获取请求。

doAcquireSharedNanos 方法体现了 AQS 的设计精髓:将线程以节点形式组织在一个双向队列中,通过细粒度的锁(这里是共享锁)和高效的线程调度(挂起和唤醒)机制来实现同步控制。此外,这种方法还兼顾了超时机制,使得线程不会无限期地等待资源。在 CountDownLatch 中,这一机制用于确保线程可以在指定时间内等待其他操作的完成。

CountDownLatch使用方法

CountDownLatch 的主要使用方法如下:

  • 创建 CountDownLatch 对象:
代码语言:javascript复制
CountDownLatch latch = new CountDownLatch(count);

其中 count 表示需要等待的线程数。

  • 等待其他线程完成任务:
代码语言:javascript复制
latch.await();

调用 await() 方法的线程会阻塞,直到 state 变量的值递减到 0。

  • 通知一个线程完成任务:
代码语言:javascript复制
latch.countDown();

调用 countDown() 方法会使 state 变量的值递减 1。

CountDownLatch 使用示例

以下示例演示了如何使用 CountDownLatch 等待多个子线程完成任务后再执行主线程任务:

代码语言:javascript复制
import java.util.concurrent.CountDownLatch

class AppInitializer {

    fun initializeApplication() {
        // 假设我们有三个数据源需要加载
        val latch = CountDownLatch(3)

        // 模拟从网络加载数据
        Thread {
            try {
                loadFromNetwork()
                println("Network data loaded")
            } finally {
                latch.countDown()
            }
        }.start()

        // 模拟从数据库加载数据
        Thread {
            try {
                loadFromDatabase()
                println("Database data loaded")
            } finally {
                latch.countDown()
            }
        }.start()

        // 模拟从文件系统加载数据
        Thread {
            try {
                loadFromFileSystem()
                println("File system data loaded")
            } finally {
                latch.countDown()
            }
        }.start()

        // 等待所有数据源加载完成
        latch.await()
        println("All data sources have been loaded. Application is ready to be used.")
    }

    private fun loadFromNetwork() {
        // 模拟网络延迟
        Thread.sleep(2000)
    }

    private fun loadFromDatabase() {
        // 模拟数据库访问延迟
        Thread.sleep(1500)
    }

    private fun loadFromFileSystem() {
        // 模拟文件读取延迟
        Thread.sleep(1000)
    }
}

总结

本文详细介绍了CountDownLatch的原理和使用方法,并提供了一个使用示例和一些应用场景。希望本文能够帮助你更好地理解和使用CountDownLatch。

0 人点赞