异步任务编排神器CompletableFuture
当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果
但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用
比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回
当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的CompletableFuture能够很方便的处理这种异步编排任务
比如在一个页面需要查询多个服务的数据,如果同步查询会导致性能太慢
异步查询多个服务的数据再汇总返回,则能提高更多的性能
API
这里的API只作简单说明,大概分下类,各个分类下具体API的功能可自行查看文档(或者用到时再自行查看文档)
CompletableFuture提供的API大概分为几个大类:
同步与异步、串行、AND、OR、
同步与异步
**API携带Async则说明是异步,并且可以设置线程池**
一般业务开发,CompletableFuture用于处理IO任务,最好使用异步,并且指定线程池
代码语言:java复制CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
System.out.println("task a run");
return "a";
});
串行
串行执行指的是任务需要同步执行,如图中的A、B任务,需要A任务执行完才能执行B任务
**串行API通常以then开头**,如:thenRunAsync、thenAccpetAsync、thenApplyAsync
代码语言:java复制CompletableFuture<String> taskB = taskA.thenApply((s) -> {
System.out.println("task b run");
return s "b";
})
AND
AND指的是需要两个任务都完成,才能继续执行后续的任务,比如图中的B、C任务,要都完成才能执行D任务
**AND相关API通常以Combine、Both有关**,如:thenCombineAsync、thenAcceptBothAsync、runAfterBothAsync
代码语言:java复制CompletableFuture<String> taskD = taskB.thenCombineAsync(taskC, (b, c) -> {
System.out.println("task d run");
return b c;
})
如果依赖多个任务同时完成,可以使用allOf(如图中的D、F、E任务)
代码语言:java复制CompletableFuture.allOf(taskF,taskE,taskD);
OR
OR指的是两个任务中其中一个完成,就可以继续执行后续任务
OR相关API通常以Either有关:applyToEitherAsync、acceptEitherAsync、runAfterEitherAsync
如果依赖多个任务的OR时使用:CompletableFuture.anyOf
异常处理
任务执行过程中可能出现异常,可以通过exceptionally 、whenComplete、handler等API对异常进行处理
代码语言:java复制CompletableFuture<String> taskF = CompletableFuture.supplyAsync(() -> {
System.out.println("task f run");
return "a";
}).exceptionally(e -> {
System.out.println("出现异常");
throw new RuntimeException("error");
});
注意事项
使用CompletableFuture时需要注意,如果不了解原理容易踩坑:
比如:任务出了异常怎么办?任务如何选择线程池的?线程又是如何执行的?
带着这一系列问题,我们往下看
出了异常怎么办?
使用CompletableFuture进行异步编排任务时,任务可能出现异常,因此**必须使用API进行处理**
**CompletableFuture遇到异常时,可能会使用CompletionException或ExecutionException包装异常**
代码语言:java复制public static void exception() {
CompletableFuture<Void> taskException = CompletableFuture.supplyAsync(() -> {
System.out.println("begin");
return null;
});
taskException
.thenApply(result -> {
int i = 1 / 0;
return i;
})
.exceptionally(err -> {
//java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
System.out.println(err);
//java.lang.ArithmeticException: / by zero
System.out.println(err.getCause());
//java.lang.ArithmeticException: / by zero
//使用工具处理异常
System.out.println(getException(err));
return 0;
});
}
因为异常会被包装,因此处理异常时,**最好使用工具类获取异常**
代码语言:java复制public static Throwable getException(Throwable throwable) {
//异常为CompletionException或ExecutionException,并且Cause不为空时解析
if ((throwable instanceof CompletionException或 || throwable instanceof ExecutionException)
&& Objects.nonNull(throwable.getCause())) {
return throwable.getCause();
}
return throwable;
}
如何选择线程池?
CompletableFuture中选择线程池有三种情况:
- **使用方法时指定线程池**
- **未指定线程池时,使用ForkJoin的公共线程池
ForkJoinPool.commonPool()
(适合CPU任务,最大线程数量 = CPU - 1)** - **未指定线程池时,使用
ThreadPerTaskExecutor
每次执行任务时创建一个线程执行 (适合周期长的任务,创建/销毁线程开销大)**
当未指定线程池时,可能使用ForkJoin的线程池也可能使用ThreadPerTaskExecutor,在没有查看源码的情况下会容易踩坑
并且 ThreadPerTaskExecutor
和 ForkJoinPool.commonPool()
都不适合IO任务
接下来一步步查看源码,分析CompletableFuture什么情况下会选择哪种线程池
代码语言:java复制CompletableFuture.supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
当我们使用未指定线程池的方法时,会直接使用asyncPool作为线程池
代码语言:java复制private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
**asyncPool根据useCommonPool来判断是使用 ForkJoinPool.commonPool()
还是使用 ThreadPerTaskExecutor
**
那么useCommonPool是如何确定的呢?我们继续往下查看
代码语言:java复制private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
能否使用useCommonPool,由 ForkJoinPool.getCommonPoolParallelism()
决定,当它大于1时则使用 ForkJoinPool.commonPool()
否则使用 ThreadPerTaskExecutor
ForkJoinPool.getCommonPoolParallelism()
返回字段 commonParallelism
static final int commonParallelism;
commonParallelism
用于表示ForkJoinPool的并行粒度,在ForkJoinPool静态代码块中赋值初始化
代码语言:java复制ForkJoinPool.static
static {
//其他略...
//创建公共池
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
//计算并行粒度
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
commonParallelism
并发粒度的字段由par决定,而par = common.config & SMASK
其中SMASK为65535(十进制),其二进制为全1,因此由 common
的字段 config
决定
(在创建公共池的过程会设置config字段)
ForkJoinPool.makeCommonPool
在创建公共池的代码中主要观察变量 parallelism
它为并发粒度
如果不携带参数,**默认情况下并发粒度为CPU核数-1**
代码语言:java复制private static ForkJoinPool makeCommonPool() {
final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
new CommonPoolForkJoinWorkerThreadFactory();
//初始化并发粒度为-1
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
//如果携带启动参数则设置为对应的并发粒度
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = commonPoolForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
//默认情况下并发粒度 = CPU核数 - 1
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
在构建对象时,config字段 this.config = (parallelism & SMASK) | mode
其中SMASK为全1,mode为0,得到的结果是不变的,因此config的值就是parallelism并发粒度
至此我们可以得出结论:**默认情况下,如果不指定线程池,当CPU核数-1超过1则会使用ForkJoin公共池(最大线程数量 = CPU核数 - 1),否则使用ThreadPerTaskExecutor(每次执行都创建线程执行)**
代码语言:java复制static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
ThreadPerTaskExecutor只适合执行周期长的任务,如果任务周期短,并且多的情况下,创建线程也会是很大一笔开销
**使用CompletableFuture时务必指定线程池,线程池最好根据业务做好隔离**
**如果不指定线程池会根据CPU核数选择ForkJoinCommonPool或ThreadPerTaskExecutor,它们并不适合IO任务**
线程如何执行?
在同步与异步的API中线程如何执行?
**在异步的API中,如果指定线程池则交给线程池中的工作线程执行,否则选择Common Pool或ThreadPerTaskExecutor**
**在同步的API中,通常是当前线程进行执行任务,但如果任务B依赖的任务A未完成则由任务A的回调线程执行,任务A如果是异步则由线程池来执行**
代码语言:java复制public static void testSync() {
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
return "ok";
}, threadPool);
CompletableFuture<Void> taskB = taskA.thenAccept(s -> {
//任务A执行完(不睡时)由当前线程执行
//任务A未执行完(睡眠时)由线程池的工作线程执行
System.out.println(s);
System.out.println(s);
});
taskB.join();
}
总结
**CompletableFuture提供串行、AND、OR、异常捕获、结果聚合等多种API,通过这些API能够更方便、快捷的实现异步任务的编排**
**使用CompletableFuture时务必对任务进行异常处理,并且它会使用CompletionException或ExecutionException包装异常,再打印异常时记得使用工具类处理,避免打印到包装的异常**
**CompletableFuture异步任务中如果指定线程池则直接使用指定的线程池**
**如果未指定线程池,当前服务器CPU数量小于等于2(并发粒度低)时使用ThreadPerTaskExecutor,其他情况(并发粒度高)使用ForkJoin框架的common pool(并发粒度 = CPU数量 - 1)**
**未指定线程池时使用的线程池适合CPU任务,并不适合IO任务,使用异步时务必指定线程池**
**当使用异步API时,由线程池的工作线程执行;使用同步API时,如果当前任务依赖的任务未完成,则有依赖、未完成的任务的线程来执行**