利用 CompletableFuture 实现并发短路

2024-10-03 09:11:32 浏览数 (5)

原文地址:Java多线程:逻辑表达式的短路运算 原文进阶:Java多线程:复杂逻辑表达式的短路运算

一、问题背景

在复杂的业务逻辑处理中,我们经常需要同时处理多个并发任务,比如检查多个条件是否满足,例如:A && B && C && ...。如果一个条件不满足(例如 A 返回了 false,则可以得出最终结果为 false),就没有必要继续等待其他线程执行结束了。问题是,如何在这种情况下快速返回想要的结果?

通过 Java 提供的 CompletableFuture 工具,我们可以实现这一目标。本文将介绍如何利用 CompletableFutureanyOfallOf 方法,快速处理并发任务,满足我们的需求。

二、CompletableFuture 简介

CompletableFuture 是 Java 8 引入的一个强大异步编程工具,它提供了丰富的组合操作,支持多任务并发处理和异步计算。

在某些场景下,我们希望任务在结果不满足某个条件时能尽早终止,而不是等待所有任务结束。然而,CompletableFuture.allOf() 默认要求所有任务都执行完成,无法处理提前返回的情况,而 anyOf() 则会在第一个任务完成后立即返回结果(无论 true 或者 false)。这就要求我们结合两者,设计出更灵活的解决方案。

补充:本文用到了CompletableFuture 在 Java 9 中引入了 orTimeout() 方法,可以方便地处理异步任务的超时问题。如果你还在使用 Java 8,可以自行封装类似的超时机制。

三、方案设计

目标是并发处理多个任务,当任意一个任务返回 false 时,立即终止其他任务并返回结果。我们可以通过 allOf 来并行处理多个任务,同时结合 anyOf 来在短路条件满足时立刻返回结果。

代码结构设计

  • 组合 anyOf 和 allOf:使用 allOf 并发执行所有任务,一旦某个任务返回 false,通过终止线程 terminationFuture 触发 anyOf 机制,提前结束未完成的任务。如果所有任务都返回 true,则正常结束。
  • 原子类 AtomicBoolean:一个线程安全的布尔值,用来存储逻辑表达式的最终结果。
  • 终止线程 terminationFuture:当任意任务返回 false 时,调用 complete() 方法,提前触发 anyOf 机制。

在这个实现中,一旦某个任务返回 false,其他任务将被忽略并提前返回,避免了不必要的资源消耗。以下是代码示例:

代码语言:javascript复制
// 超时时间
public static final long TIMEOUT = 1000L;
// 条件 A 、B 、C ... 实现
List<Predicate<ExpressionFacts>> expressionList = new ArrayList<>();
// 为了方便演示,使用 newFixedThreadPool 方法,进行线程池创建
ExecutorService executor = Executors.newFixedThreadPool(10);

/**
 * 实现 A && B && C && ... 快速失败
 *
 * @param facts 方法入参
 * @return A && B && C && ... 的最终结果
 */
public boolean evaluateAnd(ExpressionFacts facts) {
    // result 为该方法返回的最终结果,使用 Atomic 以保证变量的原子性操作
    AtomicBoolean result = new AtomicBoolean(true);
    // 终止线程,用来根据不同任务的执行结果决定是否要提前返回
    CompletableFuture<Void> terminationFuture = new CompletableFuture<>();

    CompletableFuture.anyOf(terminationFuture, CompletableFuture.allOf(expressionList.stream()
            .map(expression -> CompletableFuture.runAsync(() -> {
                        if (!result.get()) {
                            return; // 如果已经有规则不符合,直接返回
                        }
                        boolean ruleResult = expression.test(facts); // 不同任务执行的结果
                        if (!ruleResult && result.compareAndSet(true, false)) {
                            terminationFuture.complete(null); // 完成终止线程,提前返回多线程结果
                        }
                    }, executor)
                    .orTimeout(TIMEOUT, TimeUnit.MILLISECONDS) // JDK 9 引入的异步超时处理方法
                    .exceptionally(ex -> {
                        // 捕获异常,结果算作 false
                        if (result.compareAndSet(true, false)) {
                            terminationFuture.complete(null);
                        }
                        return null;
                    })).toArray(CompletableFuture[]::new))).join();

    return result.get();
}

/**
 * 实现 A || B || C || ... 快速成功
 *
 * @param facts 方法入参
 * @return A || B || C || ... 的最终结果
 */
public boolean evaluateOr(ExpressionFacts facts) {
    AtomicBoolean result = new AtomicBoolean(false);
    CompletableFuture<Void> terminationFuture = new CompletableFuture<>();

    CompletableFuture.anyOf(terminationFuture, CompletableFuture.allOf(expressionList.stream()
            .map(expression -> CompletableFuture.runAsync(() -> {
                        if (result.get()) {
                            return; // 如果已经有规则符合,直接返回
                        }
                        boolean ruleResult = expression.test(facts);
                        if (ruleResult && result.compareAndSet(false, true)) {
                            terminationFuture.complete(null); // 完成终止线程
                        }
                    }, executor)
                    .orTimeout(TIMEOUT, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> {
                        return null;
                    })).toArray(CompletableFuture[]::new))).join();

    return result.get();
}

四、优缺点分析

优点:

  • 高效性:通过并发处理和短路机制,一旦某个任务不满足条件,可以立即返回结果,避免了等待其他任务执行完毕,从而提高了系统的响应效率。
  • 任务不中断:即使短路发生,未完成的任务仍会继续执行,避免了线程被突然中断带来的问题,保持了系统的健壮性。

缺点:

  • 线程资源开销:由于引入了额外的终止线程,这会占用一些系统资源。

五、总结

本文通过结合 CompletableFuture.anyOfallOf 方法,展示了如何处理多任务并发,并通过短路机制实现高效的逻辑求值。例如:A && B && C && ... 以及 A || B || C || ...,并且在满足所有条件的前提下快速响应任务失败/成功。

六、展望

对于更复杂的逻辑表达式,例如 A && (B || C) && !D,虽然我们可以通过 Java 的逻辑运算符来实现短路,但这种操作是顺序执行的,并不能最大化利用多线程的优势。假如 D 条件先执行完毕并且结果为 true,仍需等待 A && (B || C) 执行完,这种方式显然无法充分利用并行计算的潜力。以下是代码示例:

代码语言:javascript复制
public static void main(String[] args) {
    boolean A = true;  // 可以是条件的执行逻辑,这里简单用结果 true/false 代替
    boolean B = false;
    boolean C = true;
    boolean D = false;

    boolean result = A && (B || C) && !D;
    System.out.println("结果: "   result);
}

是否可以通过多线程并发处理,可以将复杂表达式拆解为多个独立任务并行执行,同时利用短路机制在条件满足时快速返回。

下一章 :我们将深入探讨如何处理更加复杂的逻辑表达式,例如 A && (B || C) && !D 等,并进一步结合多线程优化处理流程,实现更高效的计算逻辑。

0 人点赞