使用 CountDownLatch 实现多线程协作

2023-11-08 09:46:30 浏览数 (2)

目录

前言

在多线程编程中,经常需要实现一种机制来协调多个线程的执行,以确保某些操作在所有线程完成后再进行。CountDownLatch 就是 Java 并发包中提供的一种同步工具,它能够让一个或多个线程等待其他线程完成操作。

了解 CountDownLatch

概括

CountDownLatch 是Java 1.5版本推出的一个同步辅助类,在构造时需要指定一个计数值,该计数值表示需要等待的事件数量。每当一个事件完成时,计数值就会减一,当计数值减至零时,等待的线程就会被唤醒继续执行。

CountDownLatch 的应用场景

CountDownLatch 可以被广泛应用于各种多线程协作的场景,例如:

  • 主线程等待多个子线程完成后再执行下一步操作。
  • 多个子任务并行执行,最后合并结果。
  • 并行计算中,等待所有计算任务完成后进行统一汇总。
使用案例

让我们通过一个示例代码来理解 CountDownLatch 的使用。假设有一个任务需要被分配给多个子线程来完成,并且主线程需要等待所有子线程执行完毕后才能继续执行。

代码语言:javascript复制
//任务分割的线程数
private static final int THREAD_TOTAL = 10;

//子线程执行的超时时间
private static final int countDownLatchTimeout = 5;

public static void main(String[] args) {
    //创建CountDownLatch并设置计数值,该count值可以根据线程数的需要设置
    CountDownLatch countDownLatch = new CountDownLatch(THREAD_TOTAL);

    //创建线程池,开启、创建异步线程执行任务
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < THREAD_TOTAL; i  ) {
        cachedThreadPool.execute(() -> {
            try {
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName()   " do something!");
            } catch (Exception e) {
                System.out.println("Exception: do something exception");
            } finally {
                //该线程执行完毕-1
                countDownLatch.countDown();
            }
        });
    }

    //回到主线程中
    System.out.println("Back main thread do something");
    try {
        //主线程等待线程池中完成(子线程执行超时时间)
        boolean await = countDownLatch.await(countDownLatchTimeout, TimeUnit.MINUTES);
        System.out.println(await);
    } catch (InterruptedException e) {
        System.out.println("Exception: await interrupted exception");
    } finally {
        System.out.println("countDownLatch: "   countDownLatch);
    }
    System.out.println("main thread do something-2");
}

CountDownLatch 的优缺点分析

优点
  • 简单易用:CountDownLatch 的使用非常简单,通过 await 和 countDown 方法即可实现多线程的协作。
  • 灵活性:可以根据具体场景指定等待的计数值,可以灵活控制多个线程的协作关系。
  • 高效性:底层使用了 AQS(AbstractQueuedSynchronizer)来实现同步,能够保证高效地协调多个线程的执行顺序。
缺点
  • 一次性:CountDownLatch 的计数值只能减少,无法重置。一旦计数值减至零,就不能再次使用。
  • 无法中途取消:一旦等待开始,就无法中途取消等待,除非等待超时或者发生中断。

如果您学有余力或手头没有着急的需求,请继续往下看,让我们简单从源码层面分析下CountDownLatch的实现。

从源码层面分析CountDownLatch的实现

实现

我截取了CountDownLatch内部关键实现逻辑来分析其实现原理:

CountDownLatch的功能主要通过内部类Sync实现,在内部类中,Sync继承自AbstractQueuedSynchronizer来实现同步操作,AbstractQueuedSynchronizer提供了同步器实现的基础框架,通过该类,开发者可以相对容易地实现自定义的同步器,例如独占锁、共享锁、信号量等。

代码语言:javascript复制
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;


public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}


public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
  • Sync :定义了一个名为 Sync 的静态内部类,它继承自 AbstractQueuedSynchronizer 类,这个类通常被用于实现锁和相关的同步器。
  • count:定义了一个序列化版本号,用于在对象序列化和反序列化时进行版本控制。同时count在CountDownLatch的构造方法中用于设置当前状态,即:编码人员传入的计数值。
  • getCount:获取当前状态值,即剩余的计数值。
  • tryAcquireShared:尝试获取共享资源,如果当前状态为0,则返回1表示成功获取资源,否则返回-1表示获取资源失败。
tryReleaseShared
代码语言:javascript复制
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

tryReleaseShared 方法尝试释放共享资源,首先通过一个无限循环不断尝试,在循环中获取当前状态值,如果状态值已经为0,则直接返回false;否则将状态值减1,并尝试原子性地设置状态值,如果设置成功,则返回是否状态值变为0,否则继续循环。

总的来说,这段代码实现了一个简单的 CountDownLatch 功能,通过 tryAcquireShared 方法尝试获取共享资源,通过 tryReleaseShared 方法尝试释放共享资源。当共享资源的状态值为0时,表示所有等待的线程都已被释放。

扩展

CompletableFuture简述

在JDK 1.8后,java.util.concurrent包提供了CompletableFuture类用于支持异步编程和异步任务的处理,相较于CountDownLatch,它提供了更丰富的API,就个人而言,我更喜欢CompletableFuture,因为它扩展性强,更适合JDK 8提供的函数式编程特性,代码更加优雅。

CompletableFuture 的优缺点
优点
  • 功能强大:CompletableFuture 提供了丰富的方法和组合操作,可以实现复杂的异步编程逻辑。
  • 支持异常处理:可以通过 exceptionally 或 handle 方法方便地处理异步操作中的异常情况。
  • 支持组合操作:可以通过 thenCompose、thenCombine 等方法方便地进行多个 CompletableFuture 的组合操作。
缺点
  • 学习曲线较陡:相对于 CountDownLatch,CompletableFuture 的使用可能需要更多的学习和理解异步编程的概念。
  • 复杂度较高:在复杂的业务场景下,可能会出现嵌套回调、异常处理困难等问题,增加了代码的复杂度。

总结

CountDownLatch 和 CompletableFuture 都是 Java 中用于多线程协作的工具,它们各自适用于不同的场景。CountDownLatch 更适合简单的多线程协作,而 CompletableFuture 则更适合复杂的异步编程场景。在实际应用中,我们可以根据具体的需求选择合适的工具来实现多线程协作和异步编程,以达到更好的开发效率和代码质量。

关于我

0 人点赞