异步任务编排神器CompletableFuture

2024-08-15 09:18:57 浏览数 (1)

异步任务编排神器CompletableFuture

当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果

但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用

比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回

当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的CompletableFuture能够很方便的处理这种异步编排任务

image.pngimage.png

比如在一个页面需要查询多个服务的数据,如果同步查询会导致性能太慢

异步查询多个服务的数据再汇总返回,则能提高更多的性能

API

这里的API只作简单说明,大概分下类,各个分类下具体API的功能可自行查看文档(或者用到时再自行查看文档)

CompletableFuture提供的API大概分为几个大类:

同步与异步、串行、AND、OR、

同步与异步

**API携带Async则说明是异步,并且可以设置线程池**

一般业务开发,CompletableFuture用于处理IO任务,最好使用异步,并且指定线程池

代码语言:java复制
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {

            System.out.println("task a run");

            return "a";

});

串行

串行执行指的是任务需要同步执行,如图中的A、B任务,需要A任务执行完才能执行B任务

**串行API通常以then开头**,如:thenRunAsync、thenAccpetAsync、thenApplyAsync

代码语言:java复制
CompletableFuture<String> taskB = taskA.thenApply((s) -> {

    System.out.println("task b run");

    return s   "b";

})

AND

AND指的是需要两个任务都完成,才能继续执行后续的任务,比如图中的B、C任务,要都完成才能执行D任务

**AND相关API通常以Combine、Both有关**,如:thenCombineAsync、thenAcceptBothAsync、runAfterBothAsync

代码语言:java复制
CompletableFuture<String> taskD = taskB.thenCombineAsync(taskC, (b, c) -> {

    System.out.println("task d run");

    return b   c;

})

如果依赖多个任务同时完成,可以使用allOf(如图中的D、F、E任务)

代码语言:java复制
CompletableFuture.allOf(taskF,taskE,taskD);

OR

OR指的是两个任务中其中一个完成,就可以继续执行后续任务

OR相关API通常以Either有关:applyToEitherAsync、acceptEitherAsync、runAfterEitherAsync

如果依赖多个任务的OR时使用:CompletableFuture.anyOf

异常处理

任务执行过程中可能出现异常,可以通过exceptionally 、whenComplete、handler等API对异常进行处理

代码语言:java复制
CompletableFuture<String> taskF = CompletableFuture.supplyAsync(() -> {

    System.out.println("task f run");

    return "a";

}).exceptionally(e -> {

    System.out.println("出现异常");

    throw new RuntimeException("error");

});

注意事项

使用CompletableFuture时需要注意,如果不了解原理容易踩坑:

比如:任务出了异常怎么办?任务如何选择线程池的?线程又是如何执行的?

带着这一系列问题,我们往下看

出了异常怎么办?

使用CompletableFuture进行异步编排任务时,任务可能出现异常,因此**必须使用API进行处理**

**CompletableFuture遇到异常时,可能会使用CompletionException或ExecutionException包装异常**

代码语言:java复制
public static void exception() {

    CompletableFuture<Void> taskException = CompletableFuture.supplyAsync(() -> {

        System.out.println("begin");

        return null;

    });



    taskException

            .thenApply(result -> {

                int i = 1 / 0;

                return i;

            })

            .exceptionally(err -> {

                //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

                System.out.println(err);



                //java.lang.ArithmeticException: / by zero

                System.out.println(err.getCause());



                //java.lang.ArithmeticException: / by zero

                //使用工具处理异常

                System.out.println(getException(err));

                return 0;

            });

    

}

因为异常会被包装,因此处理异常时,**最好使用工具类获取异常**

代码语言:java复制
public static Throwable getException(Throwable throwable) {

    //异常为CompletionException或ExecutionException,并且Cause不为空时解析

    if ((throwable instanceof CompletionException或 || throwable instanceof ExecutionException)

            && Objects.nonNull(throwable.getCause())) {

        return throwable.getCause();

    }

    return throwable;

}
如何选择线程池?

CompletableFuture中选择线程池有三种情况:

  1. **使用方法时指定线程池**
  2. **未指定线程池时,使用ForkJoin的公共线程池 ForkJoinPool.commonPool() (适合CPU任务,最大线程数量 = CPU - 1)**
  3. **未指定线程池时,使用 ThreadPerTaskExecutor 每次执行任务时创建一个线程执行 (适合周期长的任务,创建/销毁线程开销大)**

当未指定线程池时,可能使用ForkJoin的线程池也可能使用ThreadPerTaskExecutor,在没有查看源码的情况下会容易踩坑

并且 ThreadPerTaskExecutorForkJoinPool.commonPool() 都不适合IO任务

接下来一步步查看源码,分析CompletableFuture什么情况下会选择哪种线程池

CompletableFuture.supplyAsync

代码语言:java复制
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {

    return asyncSupplyStage(asyncPool, supplier);

}

当我们使用未指定线程池的方法时,会直接使用asyncPool作为线程池

代码语言:java复制
private static final Executor asyncPool = useCommonPool ?

    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

**asyncPool根据useCommonPool来判断是使用 ForkJoinPool.commonPool() 还是使用 ThreadPerTaskExecutor**

那么useCommonPool是如何确定的呢?我们继续往下查看

代码语言:java复制
private static final boolean useCommonPool =

    (ForkJoinPool.getCommonPoolParallelism() > 1);

能否使用useCommonPool,由 ForkJoinPool.getCommonPoolParallelism() 决定,当它大于1时则使用 ForkJoinPool.commonPool() 否则使用 ThreadPerTaskExecutor

ForkJoinPool.getCommonPoolParallelism() 返回字段 commonParallelism

代码语言:java复制
static final int commonParallelism;

commonParallelism 用于表示ForkJoinPool的并行粒度,在ForkJoinPool静态代码块中赋值初始化

ForkJoinPool.static

代码语言:java复制
static {

    //其他略...

    

    //创建公共池

    common = java.security.AccessController.doPrivileged

        (new java.security.PrivilegedAction<ForkJoinPool>() {

            public ForkJoinPool run() { return makeCommonPool(); }});

    

    //计算并行粒度

    int par = common.config & SMASK; // report 1 even if threads disabled

    commonParallelism = par > 0 ? par : 1;

}

commonParallelism 并发粒度的字段由par决定,而par = common.config & SMASK

其中SMASK为65535(十进制),其二进制为全1,因此由 common 的字段 config 决定

(在创建公共池的过程会设置config字段)

ForkJoinPool.makeCommonPool

在创建公共池的代码中主要观察变量 parallelism 它为并发粒度

如果不携带参数,**默认情况下并发粒度为CPU核数-1**

代码语言:java复制
private static ForkJoinPool makeCommonPool() {



    final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =

            new CommonPoolForkJoinWorkerThreadFactory();

    //初始化并发粒度为-1

    int parallelism = -1;

    ForkJoinWorkerThreadFactory factory = null;

    UncaughtExceptionHandler handler = null;

    try {  // ignore exceptions in accessing/parsing properties

        String pp = System.getProperty

            ("java.util.concurrent.ForkJoinPool.common.parallelism");

        String fp = System.getProperty

            ("java.util.concurrent.ForkJoinPool.common.threadFactory");

        String hp = System.getProperty

            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");

        if (pp != null)

            //如果携带启动参数则设置为对应的并发粒度

            parallelism = Integer.parseInt(pp);

        if (fp != null)

            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.

                       getSystemClassLoader().loadClass(fp).newInstance());

        if (hp != null)

            handler = ((UncaughtExceptionHandler)ClassLoader.

                       getSystemClassLoader().loadClass(hp).newInstance());

    } catch (Exception ignore) {

    }

    if (factory == null) {

        if (System.getSecurityManager() == null)

            factory = commonPoolForkJoinWorkerThreadFactory;

        else // use security-managed default

            factory = new InnocuousForkJoinWorkerThreadFactory();

    }

    if (parallelism < 0 && // default 1 less than #cores

        //默认情况下并发粒度 = CPU核数 - 1

        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)

        parallelism = 1;

    if (parallelism > MAX_CAP)

        parallelism = MAX_CAP;

    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,

                            "ForkJoinPool.commonPool-worker-");

}

在构建对象时,config字段 this.config = (parallelism & SMASK) | mode

其中SMASK为全1,mode为0,得到的结果是不变的,因此config的值就是parallelism并发粒度

至此我们可以得出结论:**默认情况下,如果不指定线程池,当CPU核数-1超过1则会使用ForkJoin公共池(最大线程数量 = CPU核数 - 1),否则使用ThreadPerTaskExecutor(每次执行都创建线程执行)**

代码语言:java复制
static final class ThreadPerTaskExecutor implements Executor {

    public void execute(Runnable r) { new Thread(r).start(); }

}

ThreadPerTaskExecutor只适合执行周期长的任务,如果任务周期短,并且多的情况下,创建线程也会是很大一笔开销

**使用CompletableFuture时务必指定线程池,线程池最好根据业务做好隔离**

**如果不指定线程池会根据CPU核数选择ForkJoinCommonPool或ThreadPerTaskExecutor,它们并不适合IO任务**

线程如何执行?

在同步与异步的API中线程如何执行?

**在异步的API中,如果指定线程池则交给线程池中的工作线程执行,否则选择Common Pool或ThreadPerTaskExecutor**

**在同步的API中,通常是当前线程进行执行任务,但如果任务B依赖的任务A未完成则由任务A的回调线程执行,任务A如果是异步则由线程池来执行**

代码语言:java复制
public static void testSync() {

        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {

//            try {

//                Thread.sleep(5000);

//            } catch (InterruptedException e) {

//                throw new RuntimeException(e);

//            }

            return "ok";

        }, threadPool);





        CompletableFuture<Void> taskB = taskA.thenAccept(s -> {

            //任务A执行完(不睡时)由当前线程执行

            //任务A未执行完(睡眠时)由线程池的工作线程执行

            System.out.println(s);

            System.out.println(s);

        });



        taskB.join();

}

总结

**CompletableFuture提供串行、AND、OR、异常捕获、结果聚合等多种API,通过这些API能够更方便、快捷的实现异步任务的编排**

**使用CompletableFuture时务必对任务进行异常处理,并且它会使用CompletionException或ExecutionException包装异常,再打印异常时记得使用工具类处理,避免打印到包装的异常**

**CompletableFuture异步任务中如果指定线程池则直接使用指定的线程池**

**如果未指定线程池,当前服务器CPU数量小于等于2(并发粒度低)时使用ThreadPerTaskExecutor,其他情况(并发粒度高)使用ForkJoin框架的common pool(并发粒度 = CPU数量 - 1)**

**未指定线程池时使用的线程池适合CPU任务,并不适合IO任务,使用异步时务必指定线程池**

**当使用异步API时,由线程池的工作线程执行;使用同步API时,如果当前任务依赖的任务未完成,则有依赖、未完成的任务的线程来执行**

0 人点赞