概述
介绍
上文我们可知:CompletableFuture
是 Java 8 引入用于支持异步编程和非阻塞操作的类。对于没有使用过CompletableFuture
通过它这么长的名字就感觉到一头雾水,那么现在我们来一起解读一下它的名字。
- Completable:可完成
- Future:未来/将来
这两个单词体现了它设计的目的:提供一种可完成的异步计算。
身世
接下来我将详细介绍CompletableFuture
的实现。
Future
接口
CompletableFuture
实现自JDK 5出现的Future
接口,该接口属于java.util.concurrent
包,这个包提供了用于并发编程的一些基础设施,其中就包括 Future
接口。Future
接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。尽管 Future 提供了一种机制来检查任务是否完成、等待任务完成,并获取其结果,但它的设计也有一些局限性,比如无法取消任务、无法组合多个任务的结果等。
Future
接口为CompletableFuture
提供了以下功能:
- 异步任务的提交:通过
Future
的接口,可以提交异步任务,并在稍后获取任务的结果,这是 Future 接口最基本的功能之一。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
- 检查任务完成状态: 使用 isDone 方法可以检查任务是否已经完成。
boolean isDone = future.isDone();
- 等待任 务完成: 通过get方法,阻塞当前线程,直到异步任务完成并获取其结果。
System.out.println("main Thread");
//开启异步线程
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
//阻塞异步线程执行完成
String result = future.get();
- 取消任务: 通过
cancel
方法,你可以尝试取消异步任务的执行。这是Future
接口的一项功能,但在实际使用中,由于限制和不确定性,这个方法并不总是能够成功取消任务。
boolean canceled = future.cancel(true);
CompletionStage接口
CompletableFuture
同时也实现自CompletionStage
接口,CompletionStage
接口是 Java 8 中引入的,在CompletableFuture
中用于表示一个步骤,这个步骤可能是由另外一个CompletionStage触发的,随当前步骤的完成,可以触发其他CompletionStage的执行。CompletableFuture
类实现了 CompletionStage
接口,因此继承了这些功能。以下是 CompletionStage
为 CompletableFuture
提供的一些关键功能:
- 链式操作:CompletionStage 定义了一系列方法,如 thenApply, thenAccept, thenRun,允许你在一个异步操作完成后,基于其结果进行进一步的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);
- 组合多个阶段:
CompletionStage
提供了thenCombine
,thenCompose
,thenAcceptBoth
等方法,用于组合多个阶段的结果,形成新的CompletionStage
。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 " " s2);
- 异常处理:
CompletionStage
提供了一系列处理异常的方法,如exceptionally
,handle
,用于在异步计算过程中处理异常情况。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 抛出异常
throw new RuntimeException("Some error");
});
CompletableFuture<String> resultFuture = future.exceptionally(ex -> "Handled Exception: " ex.getMessage());
- 顺序执行:
thenApply
,thenAccept
,thenRun
等方法可以用于在上一个阶段完成后执行下一个阶段,形成顺序执行的链式操作。
图片来源于美团技术
CompletableFuture原理与实践-外卖商家端API的异步化
CompletableFuture-tryFire
tryFire
方法是 CompletableFuture
内部的一个关键方法,用于尝试触发异步操作链中的下一个阶段。这个方法的主要作用是在合适的时机执行异步操作链中的后续阶段,将计算结果传递给下一个阶段。
为什么先介绍这个方法呢?因为这个方法的大部分API都是基于该方法的基础上实现的。
代码语言:javascript复制abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
volatile Completion next;
final void tryFire(int mode) {
// ... (其他逻辑)
// 触发下一个阶段
Completion n;
if ((n = next) != null)
n.tryFire(SYNC);
}
// ... (其他方法)
}
- 触发方式( mode ):
-
- tryFire 方法接收一个 mode 参数,表示触发的方式。常见的触发方式包括同步触发(SYNC)、异步触发(ASYNC)以及嵌套触发(NESTED)。
- 触发下一个阶段:
-
- 在 tryFire 方法中,通过 next 字段获取下一个阶段的引用,然后调用下一个阶段的 tryFire 方法,将当前阶段的计算结果传递给下一个阶段。
- 递归触发:
-
- tryFire 方法可能会递归调用下一个阶段的 tryFire 方法,以确保整个异步操作链中的阶段能够依次触发。这个递归调用保证了异步操作链的串联执行。
- 触发逻辑的条件判断:
-
- tryFire 方法中通常还包含一些条件判断,用于确定是否应该触发后续的操作。例如,可能会检查当前阶段的状态,如果满足触发条件,则继续触发。
总体而言,tryFire 方法是 CompletableFuture
异步操作链中触发后续阶段的核心方法。通过递归调用,它实现了异步操作链的顺序执行,确保了各个阶段按照期望的顺序执行,并将计算结果传递给下一个阶段。
CompletableFuture结构
字段和常量定义
字段定义
- result:存储异步计算的结果
- stack:存储观察者链
- NEXT:异步调用链中观察者链的管理
常量定义
代码语言:javascript复制// Modes for Completion.tryFire. Signedness matters.
static final int SYNC = 0;
static final int ASYNC = 1;
static final int NESTED = -1;
这三个变量用于Completion
类中tryFire
方法的标志,表示不同的触发模式。
- SYNC:表示同步触发(默认触发方式),即当前计算完成后直接执行后续的操作。适用于当前计算的结果已经准备好并且可以直接进行下一步操作的情况。
- AYSNC:表示异步触发,当前计算完成后将后续的操作提交到异步线程池中执行。即当前计算完成后将后续的操作提交到异步线程池中执行。适用于需要在不同线程上执行后续操作的情况。
- NESTED:嵌套触发,通常表示当前阶段的触发是由另一个阶段触发的,因此无需再次触发后续操作。在某些情况下,可能会避免重复触发。
内部类定义
CompletableFuture
类包含多个内部类,这些内部类用于为CompletableFuture
提供不同的API而设计的,用于异步编程中的不同阶段和操作。
常用内部类列举:
- UniCompletion、BiCompletion:
-
UniCompletion
和BiCompletion
是用于表示异步操作链中的单一阶段和二元阶段的基础抽象类。它们提供了一些通用的方法和字段,用于处理阶段之间的关系,尤其是观察者链的构建和触发。
- UniApply、UniAccept、UniRun:
-
UniApply
、UniAccept
和UniRun
是UniCompletion
的具体子类,分别用于表示异步操作链中的thenApply
、thenAccept
和thenRun
阶段。它们实现了具体的 tryFire 方法,用于触发阶段的执行。
- BiApply、BiAccept、BiRun:
-
BiApply
、BiAccept
和BiRun
是BiCompletion
的具体子类,分别用于表示异步操作链中的thenCombine
、thenAcceptBoth
和runAfterBoth
阶段。它们同样实现了具体的 tryFire 方法。
- OrApply、OrAccept、OrRun:
-
OrApply
、OrAccept
和OrRun
是BiCompletion
的另一组具体子类,用于表示异步操作链中的applyToEither
、acceptEither
和runAfterEither
阶段。同样,它们实现了具体的 tryFire 方法。
- Async:
-
Async
是CompletableFuture
内部用于表示异步操作的标志类,用于表示某个阶段需要异步执行。例如,在调用supplyAsync
、runAsync
等方法时,会生成一个带有 Async 标志的阶段。
异步编程模型
状态转换
代码语言:javascript复制volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
在CompletableFuture
中定义了两个属性:result、stack,result用于表示执行的结果或异常,stack用于表示执行完当前任务后触发的其他步骤。
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。
图10 CF基本结构
这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。
- UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
- BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
引用自美团技术。
在 CompletableFuture
中,Completion
对象表示当前的异步操作,它是被观察者。stack 中存储的是后续的步骤对象,这些对象充当观察者的角色。当当前的异步操作执行完成后,会通知 stack 中的观察者获取执行结果。
这种设计允许异步操作的串联,每个步骤都对应一个 Completion
对象,形成了观察者链。当一个异步操作完成时,它会逐一触发 stack 中的观察者对象执行相应的回调函数,实现了链式的异步操作。这个机制是 CompletableFuture
强大异步编程模型的核心之一。
为印证以上结论,我们来看个例子,追踪下源码:
例子:
代码语言:javascript复制CompletableFuture<String> originalFuture = CompletableFuture.supplyAsync(() -> "Hello");
//thenAccept方法构造Completable
CompletableFuture<Void> thenAcceptFuture = originalFuture.thenAccept(result -> {
System.out.println("Result: " result);
});
以JDK 11为例
源码:
在CompletableFuture
的thenAccept
方法中直接调用了uniAcceptStage
方法,该方法入参是线程池对象和JDK 8出现的函数式接口Consumer,即上文中的result -> {System.out.println("Result: " result);})
,这段代码的作用是获取到上一阶段的计算结果后,将计算结果传递给消费者操作f,在thenAccept
方法中将f转换成一个新的CompletableFuture
,将uniAccept推入观察者链中,来表示一个新的thenAccept
阶段。
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
Object r;
if ((r = result) != null)
return uniAcceptNow(r, e, f);
CompletableFuture<Void> d = newIncompleteFuture();
unipush(new UniAccept<T>(e, d, this, f));
return d;
}
以下代码是将给定的Completion
对象推入观察者链:
/**
* Pushes the given completion unless it completes while trying.
* Caller should first check that result is null.
*/
final void unipush(Completion c) {
if (c != null) {
//尝试将Completion对象c推入观察者链,如果返回false,
//说明推入的过程中观察者链发生了变化,可能有其他线程正在修改观察者链,
//这种情况下,通过循环尝试
while (!tryPushStack(c)) {
//result对象不为空,表示当前CompletableFuture对象已完成,计算结果已存在
if (result != null) {
NEXT.set(c, null);
break;
}
}
if (result != null)
c.tryFire(SYNC);
}
}
/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack;
NEXT.set(c, h); // CAS piggyback
return STACK.compareAndSet(this, h, c);
}
前提:判断观察者链是否被其他线程修改是通过被保持线程可见性的类、关键字修饰的。JDK 8使用的是volatile
关键字实现简单的变量的原子性和线程可见性。在JDK 11中的CompletableFuture
使用的是VarHandle
类型定义。
// VarHandle mechanics
private static final VarHandle RESULT;
private static final VarHandle STACK;
private static final VarHandle NEXT;
CompletableFuture线程池
CompletableFuture
类在执行异步操作时,默认使用 ForkJoinPool.commonPool()
作为线程池。这是一个共享的线程池,通常是一个守护线程池,适用于执行异步任务。该线程池的特性包括自动管理线程数量、支持工作窃取(work-stealing)等。
如果你想要使用自定义的线程池,可以通过传递 Executor 对象作为参数来创建 CompletableFuture
实例。
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
默认线程池
代码语言:javascript复制Executor executor = Executors.newFixedThreadPool(10); // 创建一个固定大小为10的线程池
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 异步任务的执行逻辑
}, executor);
以上是使用默认线程池的相关代码逻辑,我们来看一下源码:
代码语言:javascript复制public Executor defaultExecutor() {
return ASYNC_POOL;
}
代码语言:javascript复制 /**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ASYNC_POOL
中使用了默认的ForkJoinPool去开启一个线程池。
自定义线程池
在CompletableFuture
中提供了使用自定义线程池的方法,方法中需要传入一个线程池的接口对象,那么我们就可以传入任何一个实现自Executor
接口的线程池。
以下是基于Spring Framework中的线程池实现的异步操作:
代码语言:javascript复制@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(30);
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
}
代码语言:javascript复制@Service
public class MyAsyncService {
@Autowired
private TaskExecutor taskExecutor;
@Async
public CompletableFuture<String> performAsyncTask() {
// 异步任务的执行逻辑
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Async Task Completed";
}, taskExecutor);
}
}
并发控制
CompletableFuture
默认使用共享线程池: ForkJoinPool.commonPool()
作为线程池,通过工作窃取算法提高了任务的并行度,同时使用VarHandle
、volatile
来保证线程间的可见性和原子操作,以上保证了线程安全和高可用。