简介
代码语言:javascript复制import java.util.concurrent.CompletableFuture;
一个Future类是显示的完成,而且能被用作一个完成等级,通过它的完成触发支持的依赖函数和行为。当两个或多个线程要执行完成或取消操作时,只有一个能够成功。
CompletableFuture首先是一个Future,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。
代码语言:javascript复制public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
既然CompletableFuture类实现了CompletionStage接口。它代表了一个特定的计算的阶段,可以同步或者异步的被完成。你可以把它看成是一个计算流水线上的一个单元,最终会产生一个最终结果,这意味着几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行,接着触发下一次。
除了实现CompletionStage接口,CompletableFuture也实现了future接口, 代表一个未完成的异步事件。CompletableFuture提供了方法,能够显式地完成这个future,所以它叫CompletableFuture。
CompletableFuture
使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
代码语言:javascript复制加Async的是指CompletableFuture会把下边的代码看成两个独立的任务来执行,也就是会有两个子线程执行。
public class Main {
public static void main(String[] args) throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
// 如果执行成功:
cf.thenAccept((result) -> {
System.out.println("price: " result);
});
// 如果执行异常:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}
static Double fetchPrice() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 Math.random() * 20;
}
}
runAsync&thenRunAsync(开启任务)
描述:runAsync方法异步且不支持返回值。
代码语言:javascript复制public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
代码语言:javascript复制/**
* runAsync方法不支持返回值
*/
@Test
public void test2() throws Exception{
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return;
});
System.out.println(future.get());
}
supplyAsync(开启任务)
描述:supplyAsync方法异步且支持返回值。
代码语言:javascript复制public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
代码语言:javascript复制/**
* supplyAsync方法支持返回值
*/
@Test
public void test3() throws Exception{
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return System.currentTimeMillis();
});
long time = future.get();
System.out.println("time = " time);
}
代码语言:javascript复制public class CompletableFutureTest2 {
static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
}catch (Exception e) {
e.printStackTrace();
}
}
static void printTimeAndThread(String tag) {
String result = new StringJoiner("t|t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
@Test
public void test() {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点了 番茄炒蛋 一碗米饭");
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师炒菜");
sleepMillis(200);
printTimeAndThread("厨师打饭");
sleepMillis(100);
return "番茄炒蛋 米饭 做好了";
});
printTimeAndThread("小白打王者");
printTimeAndThread(String.format("%s, 小白开吃", completableFuture.join()));
}
}
1634367666078 | 1 | main | 小白进入餐厅
1634367666078 | 1 | main | 小白点了 番茄炒蛋 一碗米饭
1634367666117 | 1 | main | 小白打王者
1634367666117 | 11 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1634367666322 | 11 | ForkJoinPool.commonPool-worker-1 | 厨师打饭
1634367666423 | 1 | main | 番茄炒蛋 米饭 做好了, 小白开吃
thenAccept
描述:接收任务的处理结果,并消费处理,无返回结果。
代码语言:javascript复制public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
代码语言:javascript复制@Test
public void test4() {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return new Random().nextInt(10);
}
}).thenAccept(integer -> {
System.out.println(integer);
});
}
thenRun
描述:跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。
代码语言:javascript复制public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
代码语言:javascript复制public static void thenRun() throws Exception{
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return new Random().nextInt(10);
}
}).thenRun(() -> {
System.out.println("thenRun ...");
});
future.get();
}
该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenRun 的后续操作。
thenApply&thenApplyAsync(任务后置处理)
描述:当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
代码语言:javascript复制public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
代码语言:javascript复制import org.junit.Test;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest2 {
static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
}catch (Exception e) {
e.printStackTrace();
}
}
static void printTimeAndThread(String tag) {
String result = new StringJoiner("t|t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
@Test
public void test5() {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白 结账 要求开发票");
CompletableFuture invoice = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("服务员收款 500元");
sleepMillis(100);
return "500";
}).thenApply(money -> {
printTimeAndThread(String.format("服务员开发票 面额%s", money));
sleepMillis(200);
return String.format("%s元发票", money);
});
printTimeAndThread("小白 接朋友的电话, 想打一把游戏");
printTimeAndThread(String.format("小白拿到%s, 准备回家", invoice.join()));
}
}
1634372234205 | 1 | main | 小白进入餐厅
1634372234205 | 1 | main | 小白 结账 要求开发票
1634372234247 | 11 | ForkJoinPool.commonPool-worker-1 | 服务员收款 500元
1634372234247 | 1 | main | 小白 接朋友的电话, 想打一把游戏
1634372234349 | 11 | ForkJoinPool.commonPool-worker-1 | 服务员开发票 面额500
1634372234550 | 1 | main | 小白拿到500元发票, 准备回家
代码语言:javascript复制private static void thenApply() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long result = new Random().nextInt(100);
System.out.println("result1 = " result);
return result;
}
}).thenApply(new Function<Long, Long>() {
@Override
public Long apply(Long t) {
long result = t * 5;
System.out.println("result2= " result);
return result;
}
});
long result = future.get();
System.out.println(result);
}
输出:第二个任务依赖第一个任务的结果
result1 = 11
result2= 55
55
thenCombine(合并任务)
描述:thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
代码语言:javascript复制public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
代码语言:javascript复制import org.junit.Test;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest2 {
static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
}catch (Exception e) {
e.printStackTrace();
}
}
static void printTimeAndThread(String tag) {
String result = new StringJoiner("t|t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
@Test
public void test3() {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点了 番茄炒蛋 一碗米饭");
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师炒菜");
sleepMillis(200);
return "番茄炒蛋";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师打饭");
sleepMillis(100);
return "米饭";
}), (dish, rice) -> {
printTimeAndThread("服务员打饭");
sleepMillis(100);
return String.format("%s %s 好了", dish, rice);
});
printTimeAndThread("小白打王者");
printTimeAndThread(String.format("%s, 小白开吃", completableFuture.join()));
}
}
代码语言:javascript复制private static void thenCombine() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
});
CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
@Override
public String apply(String t, String u) {
return t " " u;
}
});
System.out.println(result.get());
}
输出:
hello hello
thenAcceptBoth
描述:当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。
代码语言:javascript复制public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
代码语言:javascript复制private static void thenAcceptBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" t);
return t;
}
});
f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer t, Integer u) {
System.out.println("f1=" t ";f2=" u ";");
}
});
}
runAfterBoth
描述:不关心前两个的结果,并没有返回值。
applyToEither(获取最先完成任务)
描述:两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
代码语言:javascript复制public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
代码语言:javascript复制@Test
public void test7() {
printTimeAndThread("小白走出餐厅,来到公交站");
printTimeAndThread("等待700路 或者607路公交的到来");
CompletableFuture invoice = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("700路公交正在赶来");
sleepMillis(100);
return "700路来了";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
printTimeAndThread("607路公交正在赶来");
sleepMillis(200);
return "607路来了";
}), firstComeBus -> firstComeBus);
printTimeAndThread(String.format("%s, 小白坐车回家", invoice.join()));
}
1634373145822 | 1 | main | 小白走出餐厅,来到公交站
1634373145822 | 1 | main | 等待700路 或者607路公交的到来
1634373145863 | 11 | ForkJoinPool.commonPool-worker-1 | 700路公交正在赶来
1634373145863 | 12 | ForkJoinPool.commonPool-worker-2 | 607路公交正在赶来
1634373145967 | 1 | main | 700路来了, 小白坐车回家
代码语言:javascript复制private static void applyToEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" t);
return t;
}
});
CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t) {
System.out.println(t);
return t * 2;
}
});
System.out.println(result.get());
}
acceptEither
描述:两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
代码语言:javascript复制public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
代码语言:javascript复制private static void acceptEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" t);
return t;
}
});
f1.acceptEither(f2, new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(t);
}
});
}
runAfterEither
描述:两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable),没有返回值。
代码语言:javascript复制public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
代码语言:javascript复制private static void runAfterEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" t);
return t;
}
});
f1.runAfterEither(f2, new Runnable() {
@Override
public void run() {
System.out.println("上面有一个已经完成了。");
}
});
}
runAfterBoth
描述:两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)。
代码语言:javascript复制public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
代码语言:javascript复制private static void runAfterBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" t);
return t;
}
});
f1.runAfterBoth(f2, new Runnable() {
@Override
public void run() {
System.out.println("上面两个任务都执行完成了。");
}
});
}
thenCompose&thenComposeAsync(连接任务)
描述:thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
代码语言:javascript复制public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
代码语言:javascript复制public class CompletableFutureTest2 {
static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
}catch (Exception e) {
e.printStackTrace();
}
}
static void printTimeAndThread(String tag) {
String result = new StringJoiner("t|t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
@Test
public void test2() {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点了 番茄炒蛋 一碗米饭");
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师炒菜");
sleepMillis(200);
return "番茄炒蛋";
}).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师打饭");
sleepMillis(100);
return dish " 米饭 做好了";
}));
printTimeAndThread("小白打王者");
printTimeAndThread(String.format("%s, 小白开吃", completableFuture.join()));
}
@Test
public void test2_2() {
printTimeAndThread("小白进入餐厅");
printTimeAndThread("小白点了 番茄炒蛋 一碗米饭");
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("厨师炒菜");
sleepMillis(200);
return "番茄炒蛋";
}).thenCompose(dish -> {
printTimeAndThread("服务员A准备打饭, 但被领导叫走了,打饭任务交给了服务员B");
return CompletableFuture.supplyAsync(() -> {
printTimeAndThread("服务员B打饭");
sleepMillis(100);
return dish " 米饭";
});
});
printTimeAndThread("小白打王者");
printTimeAndThread(String.format("%s, 小白开吃", completableFuture.join()));
}
}
1634370935904 | 1 | main | 小白进入餐厅
1634370935904 | 1 | main | 小白点了 番茄炒蛋 一碗米饭
1634370935943 | 11 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1634370935943 | 1 | main | 小白打王者
1634370936146 | 12 | ForkJoinPool.commonPool-worker-2 | 厨师打饭
1634370936250 | 1 | main | 番茄炒蛋 米饭 做好了, 小白开吃
1634374851354 | 1 | main | 小白进入餐厅
1634374851354 | 1 | main | 小白点了 番茄炒蛋 一碗米饭
1634374851397 | 11 | ForkJoinPool.commonPool-worker-1 | 厨师炒菜
1634374851397 | 1 | main | 小白打王者
1634374851601 | 11 | ForkJoinPool.commonPool-worker-1 | 服务员A准备打饭, 但被领导叫走了,打饭任务交给了服务员B
1634374851602 | 12 | ForkJoinPool.commonPool-worker-2 | 服务员B打饭
1634374851705 | 1 | main | 番茄炒蛋 米饭, 小白开吃
代码语言:javascript复制private static void thenCompose() throws Exception {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
System.out.println("t1=" t);
return t;
}
}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = param *2;
System.out.println("t2=" t);
return t;
}
});
}
});
System.out.println("thenCompose result : " f.get());
}
exceptionally(处理异常)
描述:处理CompletableFuture抛出异常。
代码语言:javascript复制@Test
public void test8() {
printTimeAndThread("小白走出餐厅,来到公交站");
printTimeAndThread("等待700路 或者607路公交的到来");
CompletableFuture invoice = CompletableFuture.supplyAsync(() -> {
printTimeAndThread("700路公交正在赶来");
sleepMillis(100);
return "700路来了";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
printTimeAndThread("607路公交正在赶来");
sleepMillis(200);
return "607路来了";
}), firstComeBus -> {
printTimeAndThread(firstComeBus);
if (firstComeBus.startsWith("700")) {
throw new RuntimeException("撞树上了....");
}
return firstComeBus;
}).exceptionally(e -> {
printTimeAndThread(e.getMessage());
printTimeAndThread("小白叫出租车");
return "出租车 叫到了";
});
printTimeAndThread(String.format("%s, 小白坐车回家", invoice.join()));
}
1634373499268 | 1 | main | 小白走出餐厅,来到公交站
1634373499268 | 1 | main | 等待700路 或者607路公交的到来
1634373499309 | 11 | ForkJoinPool.commonPool-worker-1 | 700路公交正在赶来
1634373499309 | 12 | ForkJoinPool.commonPool-worker-2 | 607路公交正在赶来
1634373499411 | 11 | ForkJoinPool.commonPool-worker-1 | 700路来了
1634373499411 | 11 | ForkJoinPool.commonPool-worker-1 | java.lang.RuntimeException: 撞树上了....
1634373499411 | 11 | ForkJoinPool.commonPool-worker-1 | 小白叫出租车
1634373499411 | 1 | main | 出租车 叫到了, 小白坐车回家
handle
描述:如果前边的城区正常执行,那么就会接收正常结果;如果前面的程序发生异常,那么就会接收到异常,但是无论前边是正常还是异常,handle都会返回一个结果,让程序继续运行。
代码语言:javascript复制public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
代码语言:javascript复制public static void handle() throws Exception{
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int i = 10 / 0;
return new Random().nextInt(10);
}
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if(throwable == null){
result = param * 2;
}else{
System.out.println(throwable.getMessage());
}
return result;
}
});
System.out.println(future.get());
}
输出:
java.lang.ArithmeticException: / by zero
-1
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
whenComplete
描述:跟handle类似,但是没有返回结果。
代码语言:javascript复制public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
代码语言:javascript复制@Test
public void test5() throws Exception{
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(new Random().nextInt() % 2 >= 0) {
int i = 12/0;
}
System.out.println("run end ...");
});
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("执行完成!");
}
});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败!" t.getMessage());
return null;
}
});
TimeUnit.SECONDS.sleep(2);
}
执行失败!java.lang.ArithmeticException: / by zero
执行完成!