多线程(五) | 聊聊ComplatableFuture

2023-12-18 13:04:00 浏览数 (2)

Hello,大家好,欢迎大家来到【一缕82年的清风】频道,共同探讨Java。

我们继续来聊多线程的知识,今天我们来学习一个新的工具类,也是非常重要的一个类:CompletableFuture。

我们先来回顾一下Future: Future是我们上篇文章中讲解的内容,主要就是与Callable联合使用,用来描述Callable线程任务的返回结果。我们需要通过调用get方法来获取线程的返回值。但是get() 方法有一个弊端,就是这个方式是阻塞的,在Callable线程任务没有执行完毕的时候,get()方法只能等待,直到得到返回结果,这个过程中,其实对于我们的系统资源是比较浪费的。所以从JDK1.8开始,提供了一个叫做CompletableFuture的类,提供了更多在多线程场景下的使用方式。

一、 基本功能

CompletableFuture 其实也是属于Future的子类,本身实现了Future 和 CompletionStage 两个接口。既然是Future的子类,那么说明这个类是可以用来表示有返回值线程任务的执行结果的,也是可以通过调用get方法或有有返回值的线程任务的执行结果。

CompletableFuture提供了4个静态方法,可以用于执行多线程的线程任务,其实两个方法用于执行无返回值的线程任务,另外两个方法用于执行有返回值的线程任务。

1.1 执行无返回值的线程任务

执行无返回值的线程任务,就类似于我们之前的Runnable。主要有如下两个方法。

代码语言:javascript复制
public static CompletableFuture<Void> runAsync(Runnable runnable);

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

可以看出来这两个方法的区别就是参数不同,第一个方法只需要传入线程任务Runnable, 另外一个除了传Runnable之外,还可以传入我们自己指定的线程池。我们分别演示一下。

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @className: CompletableFutureDemo
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-11 13:46
 */
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 直接执行无返回值的线程任务,这里的Runnable我们使用lambda表达式
        CompletableFuture f = CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()   "正在执行线程任务");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 这里要调用get方法,防止主线程立即结束。
        System.out.println(f.get());

        // 执行无返回值的线程任务,并指定线程池。
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()   "正在执行线程任务2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, Executors.newFixedThreadPool(3));

        // 这里也可以使用join方法,防止主线程立即结束。
                f2.join();
//        System.out.println(f2.get());
        System.out.println("111");

    }
}

ForkJoinPool.commonPool-worker-9正在执行线程任务 null pool-1-thread-1正在执行线程任务2 null

上面的代码中,我们使用CompletableFuture.runAsync() 的方法,直接执行了返回值的线程任务。通过上面打印的线程名称我们也可以看出, 不指定线程池的时候,默认使用的ForkJoinPool, 如果指定了线程池,则使用给我们自己指定的线程池来执行。 Runnable返回的future为null。

1.2 执行有返回值的线程任务

除了可以执行无返回值的线程任务,CompletableFuture类中还提供了可以提供有返回值线程任务的方法:

代码语言:javascript复制
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier
                                                                    ,Executor executor)

这两个方法和上面的两个方法是非常相似的,方法名称由run开头改为了supply开头,同时参数中的Runnable替换成了 Supplier, Supplier 这个接口如果熟悉JDK8中lambda表达式的同学一定不会陌生,他就是一个代表无参数,有返回值的函数式接口。我们可以在程序中通过lambda表达式来简化我们的写法。

我们通过程序来演示一下:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.*;

/**
 * @className: CompletableFutureDemo2
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-11 15:34
 */
public class CompletableFutureDemo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 直接执行有返回值的线程任务,这里的Supplier我们使用lambda表达式
        CompletableFuture<Boolean> f = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()   "正在执行线程任务1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return true;
        });
        System.out.println(f.get());

        // 直接执行有返回值的线程任务,指定线程池:也可以使用自定义的
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()   "正在执行线程任务2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "success";
        }, new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()));
        System.out.println(f2.get());

    }
}

执行结果:

ForkJoinPool.commonPool-worker-9正在执行线程任务1(2s后开始执行) true pool-1-thread-1正在执行线程任务2(2s后开始执行) success

1.3 结果二次操作

当我们在使用CompletableFuture执行多线层任务中,如果执行成功后想执行一些代码,或者出现异常后执行一些代码,CompletableFuture为我们提供了相应的回调函数,可以方便我们对线程任务的结果进行二次处理。

代码语言:javascript复制
// 任务执行成功后的回调, 使用结果执行一些无返回值的操作
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);

// 任务执行成功后的回调, 使用结果执行一些有返回值的操作
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
 
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);


// 任务异常情况的回调,参数为异常对象
 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

thenAccept方法和 thenAcceptAsync 两个方法区别就在于谁去执行这个任务:

thenAcceptAsync:执行的线程是从 ForkJoinPool.commonPool() ,(当然我们也可以自己指定自己的线程池)中获取不同的线程进行执行。 而 thenAccept 分两种情况:

  • 如果supplyAsync 执行速度特别快,那么thenAccept 就由主线程进行执行。
  • 如果supplyAsync 执行特别慢的话,就由thenAcceptAsync 的线程来执行。

同理:thenApply 和 thenApplyAsync 的区别也是如此。后续所有加Async的都是这个意思

而thenAccept 和 thenApply 的区别就在于一个有返回值,一个没有返回值。其实就是lambda表达式中Accept和Apply的区别 。相当于我们把上一次的执行任务结果进一步加工,没有返回值的就是thenAccept。 thenApply是把上次的结果再次封装成另外的结果。

我们通过程序来验证一下:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.*;

/**
 * @className: CompletableFutureDemo3
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-11 16:43
 */
public class CompletableFutureDemo3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()   "正在执行线程任务1");
            return "success";
        }).thenApply(result ->{
            // 由于上面的方法执行的很快,所以使用主线程来执行
            System.out.println(Thread.currentThread().getName()   "获取到线程的执行结果为:"   result);
            // 使用该结果继续一些任务
            return result   ",....";
        });
        System.out.println(f.get());
    }
}

ForkJoinPool.commonPool-worker-9正在执行线程任务1 main获取到线程的执行结果为:success success,....

我们改为supplyAsync,看看执行结果:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.*;

/**
 * @className: CompletableFutureDemo3
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-11 16:43
 */
public class CompletableFutureDemo3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()   "正在执行线程任务1");
            return "success";
        }).thenApplyAsync(result ->{
            // 由于上面的方法执行的很快,所以使用主线程来执行
            System.out.println(Thread.currentThread().getName()   "获取到线程的执行结果为:"   result);
            // 使用该结果继续一些任务
            return result   ",....";
        });
        System.out.println(f.get());
    }
}

ForkJoinPool.commonPool-worker-9正在执行线程任务1 ForkJoinPool.commonPool-worker-9获取到线程的执行结果为:success success,....

执行线程不是主线程了,是从线程池中获取的,只不过获取的和前面执行任务的是同一个线程。

我们再来看看异常的情况,由于返回值相同,我们可以直接链式书写。

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.*;

/**
 * @className: CompletableFutureDemo3
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-11 16:43
 */
public class CompletableFutureDemo3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()   "正在执行线程任务1");
            throw new RuntimeException("系统异常");
//            return "success";
        }).thenApplyAsync(result ->{
            // 由于上面的方法执行的很快,所以使用主线程来执行
            System.out.println(Thread.currentThread().getName()   "获取到线程的执行结果为:"   result);
            // 使用该结果继续一些任务
            return result   ",....";
        }).exceptionally(e->{
            System.out.println(Thread.currentThread().getName()   "异常为:"   e.getMessage() );
            return "failed";
        });
        System.out.println(f.get());
    }
}

这种方式的主要目的一是帮我们来执行获取到异步线程结果后的后续操作,二是把对于出现异常的情况单独提供了处理接口,方便我们直接在代码中对于异常进行处理。

我们再来看看 thenAccept 和 thenApply的区别:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @className: CompletableFutureDemo4
 * @description:
 * @author: sh.Liu
 * @date: 2022-04-12 10:23
 */
public class CompletableFutureDemo4 {
    // thenAccept 和 thenApply 的区别 演示
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(()->{
            return "123";
        }).thenAccept(s->{
            // 将结果直接消费,没有返回值
            System.out.println("上一步获取到的结果:"   s);
        }
        );
        System.out.println(f.get());

        System.out.println("-----------------");
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(()->{
            return "123";
        }).thenApply(s->{
            // 将执行的结果,进一步加工,转为int
                return Integer.valueOf(s);
            }
        );
        System.out.println(f2.get());
    }
}

二、其他功能

上面我们已经简单了解了CompletableFuture 类中的一些使用方式,这个类其实就是一个工具类,里面的方法都是静态的方法,我们可以直接使用类名调用。类中总共有50多个方法,接写来我们再了解一些其他的用法。

2.1 whenComplete

熟悉了上面的方法,我们再来了看下whenComplate

代码语言:javascript复制
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

whenComplete: 也是分为了带Async和不带Async,这个上面我们已经介绍过了,区别就是使用那个线程来执行。带Async的都会从线程池中获取线程来执行。另外一个带Async的,并且可以传入指定的线程池

这个方法的意思是:无论前置方法是否异常,均调用此方法,入参为前置结果和异常信息,返回结果为前置方法的返回结果

这个方法和前面的thenAccept比较相似,只不过thenAccept方法只会在前置任务执行成功的时候执行,而这个方法不管前置任务成功还是异常都会执行,并且他的参数是一个BiConsumer,是两个参数,一个就代表前置任务成功是结果,一个代表的是异常对象。如果前置任务执行成功,那么第一个参数有值,第二个参数为空。如果前置成为抛出异常,那么第一参数为空,异常对象有值。该方法可以和exceptionally组合使用,代表一旦出现异常,返回一个预设值。上程序:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @className: CompletableFutureDemo5
 * @description: whenComplete方法演示
 * @author: sh.Liu
 * @date: 2022-04-12 10:23
 */
public class CompletableFutureDemo5 {
    // whenComplete方法演示
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Double> f = CompletableFuture.supplyAsync(()->{
            double random = Math.random();
            if (random <0.5) {
                return random;
            } else {
                throw new RuntimeException("这个数大于0.5了");
            }
        }).whenComplete((result, exception)->{
            if (result != null) {
                System.out.println("获取到结果了:"   result);
            }
            if (exception != null) {
                System.out.println("获取到异常了:"   exception.getMessage());
            }
        }).exceptionally(e->{
            // 如果有异常,则返回0.0
            return 0.0;
        });
        
        System.out.println(f.get());
    }
}

出现异常时的结果:

获取到异常了:java.lang.RuntimeException: 这个数大于0.5了 0.0

未出现异常的结果:

获取到结果了:0.07667343166542318 0.07667343166542318

2.2 handle

handle的功能也比较类似: 无论前置方法是否异常,均调用此方法,入参为前置结果和异常信息,返回结果为后续方法的返回结果

代码语言:javascript复制
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
 
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
        
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

这个方法和whenComplete的区别就是有返回值,whenComplete是没有返回值的。其实本质上也就是Consumer和 Function的区别。

我们来看一下代码:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @className: CompletableFutureDemo5
 * @description: handle方法演示
 * @author: sh.Liu
 * @date: 2022-04-12 10:23
 */
public class CompletableFutureDemo6 {
    // handle方法演示
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Double> f = CompletableFuture.supplyAsync(()->{
            double random = Math.random();
            if (random <0.5) {
                return random;
            } else {
                throw new RuntimeException("这个数大于0.5了");
            }
        }).handle((result, exception)->{
            if (result != null) {
                System.out.println("获取到结果了:"   result);
                return result;
            }
            if (exception != null) {
                System.out.println("获取到异常了:"   exception.getMessage());
                return 0.0;
            }
            return null;
        });

        System.out.println(f.get());
    }
}

区别就是返回值的区别。

2.3 两个任务都完成的操作

CompletableFuture中还提供了针对两个任务的都完成之后的组合操作API, 我们来看一下:

两个任务必须都完成,触发该任务。

  • thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
  • thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
  • runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
代码语言:javascript复制
public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn);
        
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)
        
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) 

方法的形式都是一样的。

代码语言:javascript复制
public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action);
         
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action);
        
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor);
代码语言:javascript复制
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action);
                                                
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action);
                                                     
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action,
                                                     Executor executor);

我们通过案例来看一下:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;

/**
 * @className: CompletableFutureDemo5
 * @description: 两个任务的组合
 * - thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
 * - thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
 * - runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
 *
 *
 * @author: sh.Liu
 * @date: 2022-04-12 10:23
 */
public class CompletableFutureDemo7 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 先定义两个CompletableFuture 任务
        CompletableFuture<Double> f = CompletableFuture.supplyAsync(()->{
                    double random = Math.random();
                    return random;
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            return "hello";
        });

        // thenCombine 组合两个任务的结果
        CompletableFuture<String> f3 = f.thenCombine(f2, new BiFunction<Double, String, String>() {
            @Override
            public String apply(Double aDouble, String s) {
                return aDouble   s;
            }
        });
        System.out.println(f3.get());

        // thenAcceptBoth ,根据两个结果,执行一些操作,无返回值
        CompletableFuture<Void> voidResult = f.thenAcceptBoth(f2, (a, b) -> {
            System.out.println("任务1的结果为:"   a);
            System.out.println("任务2的结果为:"   b);
        });
        // null, 无返回值
        System.out.println(voidResult.get());

        // runAfterBoth 得不到任务的结果,只是在两个任务结束后,执行一些操作:
        f.runAfterBoth(f2, ()->{
            System.out.println("两个任务都执行完了,然而我并不知道他们的结果");
        });

    }
}

还有一个和thenCombine 比较相似的方法叫做: thenCompose

代码语言:javascript复制
public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn)
        
public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn);
        
public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor);

这个方法和thenCombine 还是很相似的,但是参数不一样,

  • thenCompose :参数是一个Funtion,代表上一个方法的返回值和一个新的任务,新的任务根据上一个方法的返回值进行操作会返回新的CompletableFuture。
  • thenCombine: 参数是一个任务,和一个Function,他是代表把两个任务都执行完的结果,在进行一番操作,返回的是一个新的结果。
代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;

/**
 * @className: CompletableFutureDemo5
 * @description: 两个任务的组合
 * - thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
 * - thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
 * - runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
 *
 *
 * @author: sh.Liu
 * @date: 2022-04-12 10:23
 */
public class CompletableFutureDemo7 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Double> f = CompletableFuture.supplyAsync(()->{
                    double random = Math.random();
                    return random;
        });
       
        CompletableFuture<String> f3 = f.thenCompose((result) -> CompletableFuture.supplyAsync(() -> {
            return String.valueOf(result).concat("还不错");
        }));

    }
}

代码很重要,大家一定要仔细阅读:

thenCombine: 根据两个任务结果,返回新的结果

thenAcceptBoth: 根据两个任务结果,执行一些操作,无返回

runAfterBoth:得不到两个任务的结果,只是可以在两个任务执行结束后,执行一个Runnable任务操作。

thenCompose: 根据上一个任务的执行结果,返回新的CompletableFuture

2.4 两个任务有一个完成的操作

上面讲的是两个任务需要都完成才执行的一些操作。本类中还提供了一些API用于两个任务中有一个任务完成就可以执行的操作。

  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  • runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

同理这三个方法也都有对应的Async方法,这里就不写了,都是一样的。

代码语言:javascript复制
public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn);
        
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action);
        
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action);

我们还是通过程序看效果。

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @className: CompletableFutureDemo8
 * @description: 两个任务的组合
 * - applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
 * - acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
 * - runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
 *
 *
 * @author: sh.Liu
 * @date: 2022-04-12 10:23
 */
public class CompletableFutureDemo8 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 先定义两个CompletableFuture 任务
        CompletableFuture<String> f = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "word";
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });

        // applyToEither,使用率先得到的结果,封装成新的CompletableFuture
        CompletableFuture<String> f3 = f.applyToEither(f2, a -> {
            return a   "hahah";
        });
        System.out.println(f3.get());

        //acceptEither: 使用率先获取的结果,进行消费,无需返回
        CompletableFuture<Void> voidCompletableFuture = f.acceptEither(f2, a -> {
            System.out.println(a);
        });

        // runAfterEither, 当有一个任务执行完,就执行新的任务,得不到上一步任务的结果
        f.runAfterEither(f2,()->{
            System.out.println("已经有任务执行完毕");
        });
    }
}

wordhahah word 已经有任务执行完毕

这里又有f2,率先返回结果,所以结果为hellohaha

如果我们把睡眠时间调换一下,结果也会发生变化,这就证明了只要有一个率先执行完毕,就会继续执行下一步的任务,并封装成一个新的CompletableFuture。

2.5 多个任务组合

讲完了两个任务的组合方式,接下来我们来说说多个任务的组合,主要有这么两个API。

allOf:等待所有任务完成,

anyOf:只要有一个任务完成

代码语言:javascript复制
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

我们先来看下allof方法:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @className: CompletableFutureDemo9
 * @description: 多个任务的组合
 *
 *      allOf:等待所有任务完成,此方法获取不到所有任务的返回值。
 *      anyOf:只要有一个任务完成
 *
 * @author: sh.Liu
 * @date: 2022-04-14 09:23
 */
public class CompletableFutureDemo9 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 先定义两个CompletableFuture 任务
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "word";
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });

        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Java";
        });

        CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
        all.join();
        System.out.println("所有任务均已完成");
    }
}

allof方法的返回值也是一个CompletableFuture,一般通过join方法,进行阻塞,当所有任务都执行完毕后,就会执行后面的内容。该方法中无法获取到任务的返回值,如果想得到单个任务的返回结果,可以使用单个任务的get方法得到。

anyof方法演示:

代码语言:javascript复制
package com.lsqingfeng.action.knowledge.multithread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @className: CompletableFutureDemo9
 * @description: 多个任务的组合
 *
 *      allOf:等待所有任务完成,此方法获取不到所有任务的返回值。
 *      anyOf:只要有一个任务完成
 *
 * @author: sh.Liu
 * @date: 2022-04-14 09:23
 */
public class CompletableFutureDemo9 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 先定义两个CompletableFuture 任务
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "word";
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });

        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Java";
        });

        CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
        System.out.println("已经有一个任务完成,结果为:"   any.get());


    }
}

结果:

已经有一个任务完成,结果为:word(1s后)

anyof只要有一个任务完成,就会返回,通过get方法可以得到已经完成的任务的返回结果。

三、总结

关于CompletableFuture中的常用方法我们就先讲这么多。CompletableFuture类主要作用就是提供了大量的工具方法,简化我们对于多个多线程程序的编排,梳理及返回结果的封装。通过这个类我们可以按照不同的组合方式来实现任务的编写,并且我们也可以通过函数式编程的方式,简化我们的开发代码。

0 人点赞