前言
本篇博文是《从0到1学习 Netty》中入门系列的第四篇博文,主要内容是介绍 Netty 中 Future 与 Promise 的使用,通过使用异步的方式提高程序的性能和响应速度,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
为什么要使用异步?
使用异步编程模式可以提高程序的性能和响应速度。具体来说,使用异步可以将一部分耗时较长的操作(如网络请求或文件读写)放入后台线程中执行,同时不会阻塞主线程,使得主线程可以处理其他任务,从而提高整个应用的吞吐量。
下面举一个实际的例子来说明:
假设我们正在开发一个Web应用,其中有一个页面需要加载大量图片。如果我们使用同步方式加载这些图片,那么当用户访问该页面时,应用会一直等待所有图片加载完成后才能显示完整页面,这样就会导致页面响应时间较长,用户体验不佳。
代码语言:javascript复制t=0s:用户请求打开网页
t=0s:应用开始加载页面
t=0.1s:开始加载第一张图片
t=0.5s:第一张图片加载完成
t=0.6s:开始加载第二张图片
t=1.0s:第二张图片加载完成
t=1.1s:开始加载第三张图片
t=1.5s:第三张图片加载完成
t=1.6s:开始加载第四张图片
t=2.0s:第四张图片加载完成
t=2.1s:开始加载第五张图片
t=2.5s:第五张图片加载完成
t=2.6s:开始加载第六张图片
t=3.0s:第六张图片加载完成
t=3.1s:所有图片加载完成,页面完整显示
相反,如果我们使用异步方式加载这些图片,那么页面可以先显示出来,并在后台线程中处理图片加载的操作。这样用户可以先看到基本页面内容,而不必等到所有图片都加载完成后才能看到页面。这样可以提升用户体验并减少页面响应时间。
代码语言:javascript复制t=0s:用户请求打开网页
t=0s:应用开始加载页面
t=0.1s:开始加载第一张图片
t=0.2s:开始加载第二张图片
t=0.3s:开始加载第三张图片
t=0.5s:第一张图片加载完成,显示在页面上
t=0.6s:开始加载第四张图片
t=0.7s:第二张图片加载完成,显示在页面上
t=1.0s:第三张图片加载完成,显示在页面上
t=1.1s:开始加载第五张图片
t=1.3s:第四张图片加载完成,显示在页面上
t=1.5s:第五张图片加载完成,显示在页面上
t=1.6s:开始加载第六张图片
t=2.0s:第六张图片加载完成,显示在页面上
t=2.1s:所有图片加载完成,页面完整显示
因此,使用异步编程模式可以提高程序的性能和响应速度,而且还可以提升用户体验。但是需要注意,异步编程也会增加代码的复杂度,需要仔细考虑线程安全和数据同步等问题。
概述
在异步处理时,经常会用到 Future
与 Promise
两个接口,其中包含了 JDK Future,Netty Future 和 Netty Promise。
需要注意的是:
JDK Future
只能同步等待任务结束(或成功、或失败)才能得到结果;Netty Future
可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;Netty Promise
不仅有 Netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器;
JDK Future
jdk Future
是 Java 标准库中提供的异步编程接口,它提供了一个表示异步操作结果的抽象类。在异步任务完成后,可以通过 Future.get()
方法获得任务结果,或者通过 Future.cancel()
方法取消任务。
源码如下所示:
代码语言:javascript复制package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
每个方法解释如下:
-
boolean cancel(boolean mayInterruptIfRunning)
: 尝试取消该 Future 运行。如果任务已经完成、或者已经被取消,则返回 false。如果任务还没有开始运行,则返回 true 并尝试取消任务。如果任务已经开始运行,则根据 mayInterruptIfRunning 参数的值来决定是否中断任务。如果任务被成功取消,则返回 true,否则返回 false。 -
boolean isCancelled()
: 判断该 Future 是否已经被取消。 -
boolean isDone()
: 判断该 Future 是否已经完成,无论是成功还是失败。 -
V get() throws InterruptedException, ExecutionException
: 获取该 Future 的结果,阻塞等待直到任务完成。如果任务被成功执行,则返回执行结果;如果任务抛出异常,则在该方法中重新抛出该异常。如果当前线程被中断,则抛出 InterruptedException 异常。
1、先创建一个线程池:
代码语言:javascript复制ExecutorService service = Executors.newFixedThreadPool(2);
2、提交任务:
代码语言:javascript复制Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算...");
Thread.sleep(1000);
return 21;
}
});
3、获取结果:
代码语言:javascript复制log.debug("结果是 {}", future.get());
4、完整代码:
代码语言:javascript复制@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算...");
Thread.sleep(1000);
return 21;
}
});
log.debug("等待结果...");
log.debug("结果是 {}", future.get());
}
}
5、最终结果:
代码语言:javascript复制22:18:47 [DEBUG] [main] c.s.n.c.TestJdkFuture - 等待结果...
22:18:47 [DEBUG] [pool-1-thread-1] c.s.n.c.TestJdkFuture - 执行计算...
22:18:48 [DEBUG] [main] c.s.n.c.TestJdkFuture - 结果是 21
Netty Future
Netty Future
是 Netty 框架中提供的异步编程接口。与 jdk Future
类似,但是 Netty Future
提供了更多的功能,比如添加监听器、等待结果、检查是否完成等。
在 Netty 的异步模型中,当我们向远程服务发送请求时,通常不会立即得到响应。相反,Netty 会立即返回一个 Netty Future 对象,表示该操作的未来结果。然后我们可以在后续的代码中等待这个 Future 对象的完成,并获取异步操作的实际结果。
源码如下所示:
代码语言:javascript复制package io.netty.util.concurrent;
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
每个方法解释如下:
-
boolean isSuccess()
: 判断 Future 是否已经成功完成。 -
boolean isCancellable()
: 判断 Future 是否可以被取消。 -
Throwable cause()
: 返回导致 Future 失败的原因,如果 Future 没有失败,则返回null
。 -
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener)
: 添加一个GenericFutureListener
用于在 Future 完成时通知该 Future 的状态。 -
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners)
: 添加多个GenericFutureListener
。 -
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener)
: 移除一个GenericFutureListener
。 -
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners)
: 移除多个GenericFutureListener
。 -
Future<V> sync() throws InterruptedException
: 等待 Future 完成并返回结果。如果当前线程被中断,则抛出InterruptedException
异常。 -
Future<V> syncUninterruptibly()
: 等待 Future 完成并返回结果。与sync()
方法不同的是,该方法不会抛出InterruptedException
。 -
Future<V> await() throws InterruptedException
: 等待 Future 完成,也就是等待其状态变成已完成。如果当前线程被中断,则抛出InterruptedException
异常。 -
Future<V> awaitUninterruptibly()
: 等待 Future 完成,也就是等待其状态变成已完成。与await()
方法不同的是,该方法不会抛出InterruptedException
。 -
boolean await(long timeout, TimeUnit unit) throws InterruptedException
: 等待 Future 完成,但最多等待指定时间。如果在指定时间内 Future 仍未完成,则返回false
。如果当前线程被中断,则抛出InterruptedException
异常。 -
boolean awaitUninterruptibly(long timeout, TimeUnit unit)
: 等待 Future 完成,但最多等待指定时间。如果在指定时间内 Future 仍未完成,则返回false
。与await(long timeout, TimeUnit unit)
方法不同的是,该方法不会抛出InterruptedException
异常。 -
V getNow()
: 如果 Future 已经完成,则返回该 Future 的结果,否则返回 null。 -
@Override boolean cancel(boolean mayInterruptIfRunning)
: 尝试取消该 Future 运行。如果任务已经完成、或者已经被取消,则返回false
。如果任务还没有开始运行,则返回true
并尝试取消任务。如果任务已经开始运行,则根据mayInterruptIfRunning
参数的值来决定是否中断任务。如果任务被成功取消,则返回true
,否则返回false
。
1、获取一个 EventLoop
:
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
2、提交任务:
代码语言:javascript复制Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算...");
Thread.sleep(1000);
return 21;
}
});
3、同步方式获取结果的完整代码:
代码语言:javascript复制@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算...");
Thread.sleep(1000);
return 21;
}
});
log.debug("等待结果...");
log.debug("结果是 {}", future.get());
}
4、异步方式获取结果的完整代码:
代码语言:javascript复制@Slf4j
public class TestNettyFuture {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算...");
Thread.sleep(1000);
return 21;
}
});
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("结果是 {}", future.getNow());
}
});
}
}
5、运行结果:
代码语言:javascript复制# 同步方式
23:11:28 [DEBUG] [main] c.s.n.c.TestNettyFuture - 等待结果...
23:11:28 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestNettyFuture - 执行计算...
23:11:29 [DEBUG] [main] c.s.n.c.TestNettyFuture - 结果是 21
# 异步方式
23:12:23 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestNettyFuture - 执行计算...
23:12:24 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestNettyFuture - 结果是 21
Netty Promise
Netty Promise
是一种实现了 Netty Future
接口的具体类,它表示一个异步操作的未来结果,与 Java 中的 Future 类似。通过 Promise 对象,可以在异步操作完成后获取其结果或者添加监听器,以便在异步操作完成时被通知。Promise 还提供了一些方法,可以判断异步操作是否完成、等待异步操作完成并返回结果、取消异步操作等。
在 Netty 中,当向远程服务器发送请求时,可以创建一个 Promise 对象,并将该对象作为参数传递给对应的 Channel。当远程服务器响应请求时,Promise 对象会被更新状态,并触发注册的监听器,从而实现异步回调。
源码如下所示:
代码语言:javascript复制package io.netty.util.concurrent;
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
@Override
...
}
每个方法解释如下:
-
setSuccess(V result)
:设置异步操作成功的结果,并标记该 Promise 已经完成。 -
trySuccess(V result)
:尝试将 Promise 标记为成功状态,并设置结果值。如果 Promise 已经完成或者已经被取消,则返回false
,否则返回true
。 -
setFailure(Throwable cause)
:设置异步操作失败的原因,并标记该 Promise 已经完成。 -
tryFailure(Throwable cause)
:尝试将 Promise 标记为失败状态,并设置原因。如果 Promise 已经完成或者已经被取消,则返回false
,否则返回true
。 -
setUncancellable()
:将 Promise 标记为不可取消。一旦 Promise 被标记为不可取消,就无法再次标记为可取消。
1、获取一个 EventLoop
:
EventLoop eventLoop = new NioEventLoopGroup().next();
2、创建一个 Promise
:
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
3、创建线程执行计算,并向 Promise 填充成功结果:
代码语言:javascript复制new Thread(() -> {
log.debug("执行计算...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
promise.setSuccess(21);
}).start();
4、向 Promise 填充失败结果:
代码语言:javascript复制new Thread(() -> {
log.debug("执行计算...");
try {
int i = 1 / 0;
Thread.sleep(1000);
promise.setSuccess(21);
} catch (Exception e) {
e.printStackTrace();
promise.setFailure(e);
}
}).start();
5、获取结果:
代码语言:javascript复制log.debug("结果是 {}", promise.get());
代码语言:javascript复制# 成功结果
23:49:39 [DEBUG] [main] c.s.n.c.TestNettyPromise - 等待结果...
23:49:39 [DEBUG] [Thread-0] c.s.n.c.TestNettyPromise - 执行计算...
23:49:40 [DEBUG] [main] c.s.n.c.TestNettyPromise - 结果是 21
# 失败结果
23:48:45 [DEBUG] [main] c.s.n.c.TestNettyPromise - 等待结果...
23:48:45 [DEBUG] [Thread-0] c.s.n.c.TestNettyPromise - 执行计算...
java.lang.ArithmeticException: / by zero
at com.sidiot.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:19)
at java.base/java.lang.Thread.run(Thread.java:832)
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
at com.sidiot.netty.c3.TestNettyPromise.main(TestNettyPromise.java:30)
Caused by: java.lang.ArithmeticException: / by zero
at com.sidiot.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:19)
at java.base/java.lang.Thread.run(Thread.java:832)
后记
通过使用异步编程模型,我们可以避免阻塞主线程并提高系统的性能和响应能力。
本文概述了异步编程的重要性以及几个与异步相关的关键工具:JDK Future、Netty Future和Netty Promise,这些工具为我们提供了强大的异步编程功能。通过深入理解和灵活运用这些概念,我们能够构建出更加高效、可靠的系统,并满足用户对性能和响应能力的不断增长的需求。
以上就是 异步编程模型:利用 Future 和 Promise 提高性能与响应能力 的所有内容了,希望本篇博文对大家有所帮助!
参考:
- Netty API reference;
- 黑马程序员Netty全套教程 ;