【Netty】「萌新入门」(四)异步编程模型:利用 Future 和 Promise 提高性能与响应能力

2023-08-30 15:19:42 浏览数 (1)

前言

本篇博文是《从0到1学习 Netty》中入门系列的第四篇博文,主要内容是介绍 Netty 中 Future 与 Promise 的使用,通过使用异步的方式提高程序的性能和响应速度,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

为什么要使用异步?

使用异步编程模式可以提高程序的性能和响应速度。具体来说,使用异步可以将一部分耗时较长的操作(如网络请求或文件读写)放入后台线程中执行,同时不会阻塞主线程,使得主线程可以处理其他任务,从而提高整个应用的吞吐量。

下面举一个实际的例子来说明:

假设我们正在开发一个Web应用,其中有一个页面需要加载大量图片。如果我们使用同步方式加载这些图片,那么当用户访问该页面时,应用会一直等待所有图片加载完成后才能显示完整页面,这样就会导致页面响应时间较长,用户体验不佳。

代码语言:javascript复制
t=0s:用户请求打开网页
t=0s:应用开始加载页面
t=0.1s:开始加载第一张图片
t=0.5s:第一张图片加载完成
t=0.6s:开始加载第二张图片
t=1.0s:第二张图片加载完成
t=1.1s:开始加载第三张图片
t=1.5s:第三张图片加载完成
t=1.6s:开始加载第四张图片
t=2.0s:第四张图片加载完成
t=2.1s:开始加载第五张图片
t=2.5s:第五张图片加载完成
t=2.6s:开始加载第六张图片
t=3.0s:第六张图片加载完成
t=3.1s:所有图片加载完成,页面完整显示

相反,如果我们使用异步方式加载这些图片,那么页面可以先显示出来,并在后台线程中处理图片加载的操作。这样用户可以先看到基本页面内容,而不必等到所有图片都加载完成后才能看到页面。这样可以提升用户体验并减少页面响应时间。

代码语言:javascript复制
t=0s:用户请求打开网页
t=0s:应用开始加载页面
t=0.1s:开始加载第一张图片
t=0.2s:开始加载第二张图片
t=0.3s:开始加载第三张图片
t=0.5s:第一张图片加载完成,显示在页面上
t=0.6s:开始加载第四张图片
t=0.7s:第二张图片加载完成,显示在页面上
t=1.0s:第三张图片加载完成,显示在页面上
t=1.1s:开始加载第五张图片
t=1.3s:第四张图片加载完成,显示在页面上
t=1.5s:第五张图片加载完成,显示在页面上
t=1.6s:开始加载第六张图片
t=2.0s:第六张图片加载完成,显示在页面上
t=2.1s:所有图片加载完成,页面完整显示

因此,使用异步编程模式可以提高程序的性能和响应速度,而且还可以提升用户体验。但是需要注意,异步编程也会增加代码的复杂度,需要仔细考虑线程安全和数据同步等问题。

概述

在异步处理时,经常会用到 FuturePromise 两个接口,其中包含了 JDK Future,Netty Future 和 Netty Promise。

需要注意的是:

  • JDK Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
  • Netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • Netty Promise 不仅有 Netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

JDK Future

jdk Future 是 Java 标准库中提供的异步编程接口,它提供了一个表示异步操作结果的抽象类。在异步任务完成后,可以通过 Future.get() 方法获得任务结果,或者通过 Future.cancel() 方法取消任务。

源码如下所示:

代码语言:javascript复制
package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

每个方法解释如下:

  1. boolean cancel(boolean mayInterruptIfRunning): 尝试取消该 Future 运行。如果任务已经完成、或者已经被取消,则返回 false。如果任务还没有开始运行,则返回 true 并尝试取消任务。如果任务已经开始运行,则根据 mayInterruptIfRunning 参数的值来决定是否中断任务。如果任务被成功取消,则返回 true,否则返回 false。
  2. boolean isCancelled(): 判断该 Future 是否已经被取消。
  3. boolean isDone(): 判断该 Future 是否已经完成,无论是成功还是失败。
  4. V get() throws InterruptedException, ExecutionException: 获取该 Future 的结果,阻塞等待直到任务完成。如果任务被成功执行,则返回执行结果;如果任务抛出异常,则在该方法中重新抛出该异常。如果当前线程被中断,则抛出 InterruptedException 异常。

1、先创建一个线程池:

代码语言:javascript复制
ExecutorService service = Executors.newFixedThreadPool(2);

2、提交任务:

代码语言:javascript复制
Future<Integer> future = service.submit(new Callable<Integer>() {  
    @Override  
    public Integer call() throws Exception {  
        log.debug("执行计算...");  
        Thread.sleep(1000);  
        return 21;  
    }  
});

3、获取结果:

代码语言:javascript复制
log.debug("结果是 {}", future.get());

4、完整代码:

代码语言:javascript复制
@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);

        Future<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算...");
                Thread.sleep(1000);
                return 21;
            }
        });

        log.debug("等待结果...");
        log.debug("结果是 {}", future.get());
    }
}

5、最终结果:

代码语言:javascript复制
22:18:47 [DEBUG] [main] c.s.n.c.TestJdkFuture - 等待结果...
22:18:47 [DEBUG] [pool-1-thread-1] c.s.n.c.TestJdkFuture - 执行计算...
22:18:48 [DEBUG] [main] c.s.n.c.TestJdkFuture - 结果是 21

Netty Future

Netty Future 是 Netty 框架中提供的异步编程接口。与 jdk Future 类似,但是 Netty Future 提供了更多的功能,比如添加监听器、等待结果、检查是否完成等。

在 Netty 的异步模型中,当我们向远程服务发送请求时,通常不会立即得到响应。相反,Netty 会立即返回一个 Netty Future 对象,表示该操作的未来结果。然后我们可以在后续的代码中等待这个 Future 对象的完成,并获取异步操作的实际结果。

源码如下所示:

代码语言:javascript复制
package io.netty.util.concurrent;

public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess();  
    boolean isCancellable();  
    Throwable cause();  
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);  
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);  
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
    Future<V> sync() throws InterruptedException;  
    Future<V> syncUninterruptibly();  
    Future<V> await() throws InterruptedException;  
    Future<V> awaitUninterruptibly();  
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;  
    boolean await(long timeoutMillis) throws InterruptedException;  
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);  
    boolean awaitUninterruptibly(long timeoutMillis);  
    V getNow();  
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

每个方法解释如下:

  1. boolean isSuccess(): 判断 Future 是否已经成功完成。
  2. boolean isCancellable(): 判断 Future 是否可以被取消。
  3. Throwable cause(): 返回导致 Future 失败的原因,如果 Future 没有失败,则返回 null
  4. Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener): 添加一个 GenericFutureListener 用于在 Future 完成时通知该 Future 的状态。
  5. Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners): 添加多个 GenericFutureListener
  6. Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener): 移除一个 GenericFutureListener
  7. Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners): 移除多个 GenericFutureListener
  8. Future<V> sync() throws InterruptedException: 等待 Future 完成并返回结果。如果当前线程被中断,则抛出 InterruptedException 异常。
  9. Future<V> syncUninterruptibly(): 等待 Future 完成并返回结果。与 sync() 方法不同的是,该方法不会抛出 InterruptedException
  10. Future<V> await() throws InterruptedException: 等待 Future 完成,也就是等待其状态变成已完成。如果当前线程被中断,则抛出 InterruptedException 异常。
  11. Future<V> awaitUninterruptibly(): 等待 Future 完成,也就是等待其状态变成已完成。与 await() 方法不同的是,该方法不会抛出 InterruptedException
  12. boolean await(long timeout, TimeUnit unit) throws InterruptedException: 等待 Future 完成,但最多等待指定时间。如果在指定时间内 Future 仍未完成,则返回 false。如果当前线程被中断,则抛出 InterruptedException 异常。
  13. boolean awaitUninterruptibly(long timeout, TimeUnit unit): 等待 Future 完成,但最多等待指定时间。如果在指定时间内 Future 仍未完成,则返回 false。与 await(long timeout, TimeUnit unit) 方法不同的是,该方法不会抛出 InterruptedException 异常。
  14. V getNow(): 如果 Future 已经完成,则返回该 Future 的结果,否则返回 null。
  15. @Override boolean cancel(boolean mayInterruptIfRunning): 尝试取消该 Future 运行。如果任务已经完成、或者已经被取消,则返回 false。如果任务还没有开始运行,则返回 true 并尝试取消任务。如果任务已经开始运行,则根据 mayInterruptIfRunning 参数的值来决定是否中断任务。如果任务被成功取消,则返回 true,否则返回 false

1、获取一个 EventLoop

代码语言:javascript复制
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();

2、提交任务:

代码语言:javascript复制
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        log.debug("执行计算...");
        Thread.sleep(1000);
        return 21;
    }
});

3、同步方式获取结果的完整代码:

代码语言:javascript复制
@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算...");
                Thread.sleep(1000);
                return 21;
            }
        });
        
        log.debug("等待结果...");
        log.debug("结果是 {}", future.get());
}

4、异步方式获取结果的完整代码:

代码语言:javascript复制
@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算...");
                Thread.sleep(1000);
                return 21;
            }
        });

        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.debug("结果是 {}", future.getNow());
            }
        });
    }
}

5、运行结果:

代码语言:javascript复制
# 同步方式
23:11:28 [DEBUG] [main] c.s.n.c.TestNettyFuture - 等待结果...
23:11:28 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestNettyFuture - 执行计算...
23:11:29 [DEBUG] [main] c.s.n.c.TestNettyFuture - 结果是 21

# 异步方式
23:12:23 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestNettyFuture - 执行计算...
23:12:24 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestNettyFuture - 结果是 21

Netty Promise

Netty Promise 是一种实现了 Netty Future 接口的具体类,它表示一个异步操作的未来结果,与 Java 中的 Future 类似。通过 Promise 对象,可以在异步操作完成后获取其结果或者添加监听器,以便在异步操作完成时被通知。Promise 还提供了一些方法,可以判断异步操作是否完成、等待异步操作完成并返回结果、取消异步操作等。

在 Netty 中,当向远程服务器发送请求时,可以创建一个 Promise 对象,并将该对象作为参数传递给对应的 Channel。当远程服务器响应请求时,Promise 对象会被更新状态,并触发注册的监听器,从而实现异步回调。

源码如下所示:

代码语言:javascript复制
package io.netty.util.concurrent;

public interface Promise<V> extends Future<V> {

    Promise<V> setSuccess(V result);  
    
    boolean trySuccess(V result);  
    
    Promise<V> setFailure(Throwable cause);  
    
    boolean tryFailure(Throwable cause);  
    
    boolean setUncancellable();
    
    @Override
    ...
}

每个方法解释如下:

  1. setSuccess(V result):设置异步操作成功的结果,并标记该 Promise 已经完成。
  2. trySuccess(V result):尝试将 Promise 标记为成功状态,并设置结果值。如果 Promise 已经完成或者已经被取消,则返回 false,否则返回 true
  3. setFailure(Throwable cause):设置异步操作失败的原因,并标记该 Promise 已经完成。
  4. tryFailure(Throwable cause):尝试将 Promise 标记为失败状态,并设置原因。如果 Promise 已经完成或者已经被取消,则返回 false,否则返回 true
  5. setUncancellable():将 Promise 标记为不可取消。一旦 Promise 被标记为不可取消,就无法再次标记为可取消。

1、获取一个 EventLoop

代码语言:javascript复制
EventLoop eventLoop = new NioEventLoopGroup().next();

2、创建一个 Promise

代码语言:javascript复制
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

3、创建线程执行计算,并向 Promise 填充成功结果:

代码语言:javascript复制
new Thread(() -> {  
    log.debug("执行计算...");  
    try {  
        Thread.sleep(1000);  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }  
    promise.setSuccess(21);  
}).start();

4、向 Promise 填充失败结果:

代码语言:javascript复制
new Thread(() -> {  
    log.debug("执行计算...");  
    try {  
        int i = 1 / 0;  
        Thread.sleep(1000);  
        promise.setSuccess(21);  
    } catch (Exception e) {  
        e.printStackTrace();  
        promise.setFailure(e);  
    }  
}).start();

5、获取结果:

代码语言:javascript复制
log.debug("结果是 {}", promise.get());
代码语言:javascript复制
# 成功结果
23:49:39 [DEBUG] [main] c.s.n.c.TestNettyPromise - 等待结果...
23:49:39 [DEBUG] [Thread-0] c.s.n.c.TestNettyPromise - 执行计算...
23:49:40 [DEBUG] [main] c.s.n.c.TestNettyPromise - 结果是 21

# 失败结果
23:48:45 [DEBUG] [main] c.s.n.c.TestNettyPromise - 等待结果...
23:48:45 [DEBUG] [Thread-0] c.s.n.c.TestNettyPromise - 执行计算...
java.lang.ArithmeticException: / by zero
	at com.sidiot.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:19)
	at java.base/java.lang.Thread.run(Thread.java:832)
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
	at com.sidiot.netty.c3.TestNettyPromise.main(TestNettyPromise.java:30)
Caused by: java.lang.ArithmeticException: / by zero
	at com.sidiot.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:19)
	at java.base/java.lang.Thread.run(Thread.java:832)

后记

通过使用异步编程模型,我们可以避免阻塞主线程并提高系统的性能和响应能力。

本文概述了异步编程的重要性以及几个与异步相关的关键工具:JDK Future、Netty Future和Netty Promise,这些工具为我们提供了强大的异步编程功能。通过深入理解和灵活运用这些概念,我们能够构建出更加高效、可靠的系统,并满足用户对性能和响应能力的不断增长的需求。

以上就是 异步编程模型:利用 Future 和 Promise 提高性能与响应能力 的所有内容了,希望本篇博文对大家有所帮助!

参考:

  • Netty API reference;
  • 黑马程序员Netty全套教程 ;

0 人点赞