提高效率,实现异步编程,我用CompletableFuture(上)

2024-03-01 06:38:45 浏览数 (2)

提高效率,实现异步编程,我用CompletableFuture(上)

大家好,我是小高先生,这篇文章我将和大家一起学习Java并发编程中很重要的一个类-CompletableFuture。

在Java的并发编程领域,Future接口一直扮演着关键的角色,它定义了一组与异步任务执行相关的方法,包括获取异步任务的结果、取消任务执行以及检查任务是否已完成等。然而,随着业务场景的复杂化,Future逐渐暴露出一些局限性,无法满足所有并发处理的需求。

为了应对这些挑战,Java引入了CompletableFuture,这是对Future接口的一次重大改进。CompletableFuture不仅包含了Future的所有功能,还提供了更强大的能力,如回调通知、组合多个异步计算、异常处理等。通过CompletableFuture,我们可以更加简洁和高效地处理复杂的异步任务。

本文将重点介绍Future的作用和CompletableFuture的优势,无论大家是Java初学者还是有经验的开发者,相信本文都能提供一些实质性的帮助。

  • Future和FutureTask是什么?
  • FutureTask应用及优缺点
  • 全面升级——CompletableFuture
  • 总结

Future和FutureTask是什么?

在多线程编程中,我们经常会遇到一些耗时的任务,这些任务如果由主线程直接执行,会导致主线程的阻塞,进而影响整体程序的响应效率。为了解决这个问题,我们引入了Future接口。

Future接口的设计初衷,就是为了解决一个核心问题:如何有效地处理异步任务?在没有Future的情况下,如果我们的主线程正在执行一项任务,突然有一项耗时的任务需要处理,那么我们的主线程就不得不暂停当前的工任务,转而去执行这个耗时的任务。完成耗时任务后,主线程还需要回到原来的任务上,继续执行。

这种情况下,主线程的耗时会显著增加,效率低下。然而,有了Future之后,情况就大为不同了。我们可以创建一个子线程,让子线程去执行耗时的任务。在启动子线程开始执行任务后,主线程就可以去做其他事情,不必等待耗时任务的完成。当主线程忙完之后,再回来询问耗时任务是否已经完成,并获取任务的执行结果。

通过这种方式,我们实现了耗时任务的异步处理,大大提升了主线程的处理效率,也使得程序的整体响应性能得到提升。这就是Future接口存在的意义和价值。

让我们通过一个生活化的例子来解释。假设你正在和你的女朋友享受一天的休闲时光,突然,你的快递到了。这时,你是否应该立即回家取快递呢?如果你选择回家,那么你们的快乐时光就会被中断,只能等取完快递后才能继续。这无疑会降低你们的娱乐体验,甚至可能引发你女朋友的不满。为了解决这个问题,你可以请你的朋友小董帮忙(要问我小董是谁,是我的单身兄弟),让他去取快递。这样,你就不需要亲自回家,也不会影响你们的娱乐活动。在编程中,这个场景可以这样理解:主线程(你)正在进行一项任务(玩),而另一项耗时的任务(取快递)可以通过创建一个子线程(小董)来执行。然后,主线程可以询问子线程是否完成任务并获取结果。这个由子线程执行的任务就是所谓的异步任务,这也是Future接口存在的主要意义。

Future接口提供了一种异步计算的能力,允许主线程为耗时的复杂业务创建一个新的执行路径。当然,有了接口就需要有实现类,FutureTask就是Future的一个实现类

要使用异步任务,我们需要创建子线程。我们的目标是让子线程执行任务并提供返回值。这个目标的特点非常明显,分别是异步性、多线程性和返回值性。对于如何创建线程,相信大多数人都已经非常熟悉。我们可以将这些特点整合在一起,进一步分析其特性。

接口

特点

Runnable

无返回值

Callable

有返回值

Future

异步任务

当我们在Java中创建新的线程时,通常会使用Runnable接口作为任务的表示形式。然而,如果我们的任务需要返回一个结果,并且还需要支持查询任务状态或停止任务的功能,那么我们需要找到一个能够同时满足这些需求的接口。幸运的是,RunnableFuture接口正好满足了这些要求,它继承了Runnable和Future两个接口,其实现类是FutureTask

FutureTask实现了Runnable接口,这意味着它可以被用作线程的任务。同时,它也实现了Future接口,因此我们可以查询任务的状态,或者在任务完成之前尝试停止任务。此外,通过查看FutureTask的源代码,我们还会发现,FutureTask还支持构造函数注入,允许我们传入一个Callable对象作为任务,这样任务就有返回值了。因此,FutureTask完美地符合了我们对于异步任务的所有需求。

Future具有五种有关操作异步任务的方法,包括获取异步任务的结果、取消任务执行以及检查任务是否已完成等,FutureTask实现了这五种方法。

下面是一个创建FutureTask的简单案例:

代码语言:java复制
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());

        Thread t1 = new Thread(futureTask,"t1");
        t1.start();

        System.out.println(futureTask.get());

    }
}

class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("hello juejin");
        return "hello xiaogao";
    }
}

总结一下这个神奇的FutureTask类。通过注入构造函数的方式,FutureTask与Callable接口建立了紧密的联系,使得它能够处理有返回值的任务。同时,它还实现了Runnable、Future和RunnableFuture三个接口,这使得它既可以作为线程任务运行,又可以查询任务状态或在需要时停止任务。FutureTask以其独特的设计,优雅地满足了异步编程中的多种需求,体现了Java并发编程的强大和灵活。

FutureTask应用及优缺点

在应用中,Future通常和线程池相结合,能显著提高程序的执行效率。下面看几个例子,感受一下FutureTask的实际应用。

考虑这样一个场景,假设我们有三项任务需要完成,如果全部交给主线程(main线程)来处理,那么这些任务将会串行执行,总耗时大约为1秒多。这种串行处理方式虽然简单,但在处理大量任务时效率较低。为了提高处理效率,我们可以采用多线程异步处理的方式。然而,频繁创建和销毁线程会带来一定的开销,因此我们通常会使用线程池来管理线程资源。通过使用线程池,我们可以复用已经创建的线程,避免了频繁创建和销毁线程所带来的开销。同时,线程池还可以根据任务的数量动态调整线程数量,以适应不同的负载情况。

代码语言:java复制
public static void main(String[] args) throws ExecutionException, InterruptedException {
    //3个线程,目前只有一个main来处理,耗时多久
    long startTime = System.currentTimeMillis();

    try {
        TimeUnit.MILLISECONDS.sleep(500);
    }catch (InterruptedException e){
        e.printStackTrace();
    }
    try {
        TimeUnit.MILLISECONDS.sleep(500);
    }catch (InterruptedException e){
        e.printStackTrace();
    }
    try {
        TimeUnit.MILLISECONDS.sleep(500);
    }catch (InterruptedException e){
        e.printStackTrace();
    }

    long endTime = System.currentTimeMillis();
    System.out.println("----costTime: "   (endTime - startTime)   "毫秒");

    System.out.println(Thread.currentThread().getName()   "t ----end");

}
代码语言:java复制
public static void main(String[] args) throws ExecutionException, InterruptedException {
    //开启多个异步线程处理
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    long startTime = System.currentTimeMillis();

    FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return "task1 over";
    });
    executorService.submit(futureTask1);

    FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return "task2 over";
    });
    executorService.submit(futureTask2);

    System.out.println(futureTask1.get());
    System.out.println(futureTask2.get());

    try {
        TimeUnit.MILLISECONDS.sleep(300);
    }catch (InterruptedException e){
        e.printStackTrace();
    }

    long endTime = System.currentTimeMillis();
    System.out.println("----costTime: "   (endTime - startTime)   " 毫秒");

    executorService.shutdown();

}

通过上述案例,我们可以发现利用Future和线程池可以提升程序执行的效率,这是一个非常好的方法。然而,Future也有一些缺点。其中一个缺点就是调用get()方法时可能会被阻塞。下面代码就是一个正常的使用FutureTask,但是注意一下我们是在最后调用的get(),这并没有什么问题。

代码语言:java复制
public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName()   "t -----come in");
            try {
                TimeUnit.SECONDS.sleep(5);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(Thread.currentThread().getName()   "t ---忙其他任务");
        System.out.println(futureTask.get());
        
    }
}

现在我们把get()方法向上调整一下位置,不放在最后,也就是主线程任务还没做完呢就调用get()方法。代码运行之后,很明显程序会阻塞在get()的位置。问题也就很明显了,get()非要等到结果才会离开,容易造成程序阻塞,一般建议放在程序最后。

代码语言:java复制
public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName()   "t -----come in");
            try {
                TimeUnit.SECONDS.sleep(5);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(futureTask.get());
        System.out.println(Thread.currentThread().getName()   "t ---忙其他任务");
    }
}

一种解决办法就是调用带有超时时间的get(),等待超时会结束程序并抛出异常,这种方式只能说是饮鸩止渴,并不是好办法。我们就是想不让线程阻塞,让它干点事情,可以借助isDone(),通过轮询的方式判断异步任务是否结束,并在阻塞的过程中让CPU执行其他任务。不过这种方法也有问题,这种无意义的轮询会导致CPU空转,消耗资源。

代码语言:java复制
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    FutureTask<String> futureTask = new FutureTask<String>(() -> {
        System.out.println(Thread.currentThread().getName()   "t ----come in");
        TimeUnit.SECONDS.sleep(5);
        return "task over";
    });
    Thread t1 = new Thread(futureTask,"t1");
    t1.start();

    System.out.println(Thread.currentThread().getName()  "t ---忙其他任务了");
    while (true){
        if(futureTask.isDone()){
            System.out.println(futureTask.get());
            break;
        }else{
            TimeUnit.MILLISECONDS.sleep(500);
            System.out.println("不要再催了");
        }
    }
    
}

通过上面的几个例子可以看出,Future对于结果的获取并不友好,只能通过阻塞或轮询的方式得到任务结果。

在探索Future的过程中,我们采用了多种方式逐步深化对其理解,并在遇到问题时寻求解决方案,即使这些方案并非最优解,但这正是迭代思想的魅力所在。然而,我们也能发现,在实际应用中,对于简单的业务场景,使用Future可能没有太大问题,但在复杂的场景下,其效果确实不尽如人意,仿佛身体被掏空。因此,为了解决这些问题,Java引入了Future的升级版——CompletableFuture。

引入CompletableFuture之前,可以想一下归纳一下我们的小诉求,首先要解决get()和isDone()的问题,其次还要加一些新功能。

1. 当Future完成时,可以通知我,也就是回调通知,之前通过轮询的方式去判断任务是否完成的方式非常占CPU并且代码也不美观。

2. 之前我们的案例是用线程池创建多个线程去处理多个异步任务,现在想将多个异步任务的计算结果组合起来合成一个异步计算,几个异步计算相互独立,同时后面的这个又依赖于前一个处理结果。就像烤肉一样,第一步买肉,第二步腌制,第三步下锅,这是一条完整的链路。

3. 选出计算速度最快的任务,当多个异步任务有一个最快结束时,返回第一个处理完成的结果。

基于以上的需求,如果还用Future,那妥妥送人头,干脆一不做二不休,搞一个更狠的装备,使用CompletableFuture。Future能干的,CompletableFuture也能干,或者说你不干,有的是CompletableFuture干。

全面升级——CompletableFuture

通过前面的介绍,我们丝滑的引入了CompletableFuture。CompletableFuture提供了一种机制,可以让任务执行完成后通知监听的一方。 CompletableFuture类实现了两个接口,分别是Future和CompletionStage,因此Future具有的方法,CompletableFuture也都实现了,主要看一下CompletionStage的作用。CompletionStage代表异步计算过程中的某一个阶段,就跟它名字一样,一个阶段完成后可能会触发另一个阶段,并提供了相当多的方法。总的来说,CompletableFuture是Future的扩展,可能代表一个明确完成的Future,也可能代表一个完成的阶段(CompletionStage),其作用是简化异步编程的复杂性,可以通过回调的方式处理计算结果,也提供转换和组合CompletableFuture的方法。

创建CompletableFuture时,可以通过无参构造的方式创建,但官方并不推荐这种方式。通常,我们使用静态方法来获取CompletableFuture对象。官方提供了两组四个静态方法来创建CompletableFuture,这两组方法的区别在于是否有返回值。传入Runnable参数的方法没有返回值,而传入Supplier参数的方法则有返回值。在实际工作中,我们通常使用supplyAsync方法,因为它有返回值,适合进行交互操作。在这两组方法中,两个方法的区别是是否传入Executor参数。如果没有指定Executor,那么将使用默认的ForkJoinPool.commonPool()作为其线程池执行异步代码。总之,通过使用CompletableFuture的静态方法,我们可以方便地创建具有返回值或无返回值的异步任务,并根据需要选择是否指定Executor参数。

代码语言:java复制
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->{
        System.out.println(Thread.currentThread().getName()   "t"   "-----come in");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello supplyAsync";
    });
    System.out.println(future.get());
}

总结

本文首先介绍了基础接口Future,它代表了一个异步计算的结果,允许我们在不阻塞主线程的情况下执行耗时操作,并在操作完成后获取结果。然而,尽管Future在某些场景下足够使用,但在面对更复杂的业务需求时,它可能显得力不从心。为了更好地应对并发编程的挑战,我们需要更强大的武器。在本篇文章中,我们将通过几个实战案例,循序渐进地将基础装备Future升级为更为强大的神装——CompletableFuture。同时,也详细介绍了CompletableFuture的创建方式,以及如何使用静态方法来获取CompletableFuture对象。

0 人点赞