Spring5 Webflux
前言
✓ 优质技术好文见专栏 个人公众号,分享一些技术上的文章,以及遇到的坑 当前系列:Spring5 Webflux 系列 源代码 git 仓库 ‘ Reactor代码地址 代码Git 仓库地址 webflux helloworld 代码地址
代码环境 jdk11 里面用到了 java9的 特性
Lambda
这个表达式 其实就是一个新的语法糖,这里Java8主要是对语法做了简化,让我们java的代码更加的简洁
Lambda可以总在哪里呢?
函数式接口 只实现了一个方法的接口,我们就叫函数式接口,这个时候可能会有java的警报
@FunctionalInterface
有这个注解,java就会知道哦 你这个是函数式接口,就不会有警报了
简单的Lambda实战
我们就拿多线程中的 Runnable
接口来做例子
@Test
public void test() {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("你好我是传统线程");
}
}).start();
new Thread(() -> {
System.out.println("你好我是Lambda的第一个线程");
}).start();
}
我们可以根据上边语句的变化来看出 语法的简洁
代码语言:javascript复制() = 代表的是我们的参数列表,Lambda表达式的参数和我们调用方法参数必须一致
-> 尖头标识符 代表我们要使用Lambda
{} 方法体,这里是我们使用表达式的具体操作,也可以用方法引用的方式,用其他包装好点类的方法来做处理
代码语言:javascript复制编写一个自己的函数式接口,并且练习
@FunctionalInterface
public interface MyinterFace {
void method();
}
class test {
void dosth(MyinterFace myinterFace) {
System.out.print("Function A ");
myinterFace.method();
}
public static void main(String[] args) {
//这里我们使用自己的函数式接口 输出语句
test test = new test();
test.dosth(() -> {
System.out.print(" do sth");
});
}
}
可以看到我们用自己的函数式接口作为参数 调用函数方法的 dosth,这个时候我们可以用Lambda表达式来实现我们这个接口里的步骤,这里我们以输出 do sth 为操作。
问题处理
这里时候我们有两个方法,一个使用了 myinterfaceA 一个使用率 myinterFace B 这个时候我们 Lambda表达式没办法去识别,需要我们显示的声明用谁的
代码语言:javascript复制@FunctionalInterface
interface MyinterFaceA {
void method();
}
@FunctionalInterface
interface MyinterFaceB {
void method();
}
class test {
void dosth(MyinterFaceA myinterFace) {
System.out.print("Function A ");
myinterFace.method();
}
void dosth(MyinterFaceB myinterFace) {
System.out.print("Function A ");
myinterFace.method();
}
public static void main(String[] args) {
//这里我们使用自己的函数式接口 输出语句
test test = new test();
test.dosth((MyinterFaceA) () -> {
System.out.print(" do sth");
});
}
}
常用的java函数
提供者接口 : Supplier 没有输入只有输出
消费者接口 : Consumer 没有出只有输入
函数接口 : Function 放入一个对象返回一个新对象
- UnaryOperator 对于 放入和输出类型一致时候的函数借口
- BiFunction接口: 输入两个对象,返回一个新对象
代码语言:javascript复制Coding
/**
* @projectName: Webflux_demo
* @package: Lambda
* @className: JdkFunctionDmo
* @author: 冷环渊 doomwatcher
* @description: TODO
* @date: 2021/12/13 19:04
* @version: 1.0
*/
public class JdkFunctionDmo {
public static void main(String[] args) {
// Supplier 没有输入 只有输出
Supplier<String> supplier = () -> "我是一个 Supplier 方法";
System.out.println(supplier.get());
//Consummer 只有输入 没有输出
Consumer<String> con = i -> System.out.println("我是一个 Conusmer Demo" i);
con.accept(" hello Consumer i am 冷环渊");
//function 放入一个对象生成一个新的对象
Function<Integer, Integer> func = i -> i * i;
Integer apply = func.apply(9);
System.out.println("Function demo out:" apply);
//对于 放入和输出类型一致的哦我们 Function接口里有一个实现 UnaryOperator
UnaryOperator<Integer> unaryOperator = i -> i * i - i;
System.out.println("Function demo out:" unaryOperator.apply(9));
//输入两个对象 返回一个新的对象 BiFunction
BiFunction<Integer, Integer, String> biFunction = (i, e) -> i * e "元";
System.out.println("我一共该交给你多少钱:" biFunction.apply(40, 80));
}
}
到这里我们 Lambda表达式的快速认识就结束了,接下来是Java8的另一个特性,流式编程
Stream
我们通过演示的代码来带入 Stream api 的变成 以及我们做一个小练习
代码语言:javascript复制coding
/**
* @projectName: Webflux_demo
* @package: Stream
* @className: StreamAPITest
* @author: 冷环渊 doomwatcher
* @description: TODO
* @date: 2021/12/13 20:09
* @version: 1.0
*/
public class StreamAPITest {
public static void main(String[] args) {
String[] strarr = {"bo_le", "", "webfulx", "redis", "spring", "mirc_Sercice"};
// 数组 arr 创建 Stream
//Arrays.stream(strarr).forEach(System.out::println);
//2.list
//Arrays.asList(strarr).stream().forEach(System.out::println);
//3.stream.of()
//Stream.of(strarr).forEach(System.out::println);
// 4.迭代器 打印 1-10 元素
//Stream.iterate(1, i -> i 1).limit(10).forEach(System.out::println);
// 5. generate 打印随机数 10以内的随机数
//Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println);
/*
* 现实中的流 变成 完整案例
* 元素的中间操作,元素的终止操作
* 结果依次 输出 abceo
*
* 结果 一次输出 belo
* bo_le --> bole ->字符转换成一个新的流(b o l e)-> sorted->(belo);
*
* PS: 注意事项 在流编程中 终止操作只能有一个,中间操作可以有 0-n个
* */
String[] arr = {"react", "", "spring", "bo_le", "bo_le"};
Stream.of(arr)
.filter(i -> !i.isEmpty())
.distinct()
.sorted()
.limit(1)
.map(i -> i.replace("_", ""))
.flatMap(i -> Stream.of(i.split("")))
.sorted()
.forEach(System.out::println);
}
}
Reactor Project
官网地址 :官方地址
简介
Reacive 异步非阻塞响应式框架 特点: 低延迟,高吞吐
,以下简介均来自spring官方文档。
反应式系统具有一些特性,使其成为低延迟、高吞吐量工作负载的理想选择。Project Reactor 和 Spring 产品组合协同工作,使开发人员能够构建具有响应性、弹性、弹性和消息驱动的企业级反应式系统。
响应式系统和传统的同步阻塞调用模型
- 传统的模型 ,client 不管有多少信息都会一次性发给server,这个时候如果Server性能够,可以能会造成大量的客户端请求无法响应,之后就会拒绝请求和请求失败
- 而响应式的模型有一个东西叫做 背压,需要数据,可以通过背压去控制数量,这样就不会让大量的数据冲垮我们的服务器
什么是响应式?
响应式处理是一种范例,它使开发人员能够构建可以处理背压(流控制)的非阻塞、异步应用程序。
为什么需要响应式
反应式系统更好地利用现代处理器。此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性。
有关响应式系统特质的论文
论文地址:https://www.reactivemanifesto.org/zh-CN
Reactor 核心库
Project Reactor 是一个完全无阻塞的基础,包括背压支持。它是 Spring 生态系统中响应式堆栈的基础,并在 Spring WebFlux、Spring Data 和 Spring Cloud Gateway 等项目中具有特色。
与springBoot整合
Spring 产品组合提供了两个并行堆栈。一种是基于带有 Spring MVC 和 Spring Data 构造的 Servlet API。另一个是利用 Spring WebFlux 和 Spring Data 的反应式存储库的完全反应式堆栈。在这两种情况下,Spring Security 都为您提供了对这两个堆栈的本机支持。
可以看到,响应式的技术栈,和我们熟悉的MVC那一套不一样,这里我们的技术基本是换了一套,还没有很好的第三方框架的兼容性
响应式技术组建的关系
我们之后的demo Coding也会跟着从里到外的API 来学习
- ReativeStream
我们来看一下,响应式的流程
订阅者来决定可以接受多少数据,生产者根据背压的规则来传递,这样就不会出现像传统架构一样的问题
下图:就是我们的响应流的运行模型
ReactiveStream(JDK9)编程
coding
ReactiveStream helloworld
- 我们需要 发布者,订阅者,两者绑定,发送消息,关闭流
/**
* @projectName: Webflux_demo
* @package: reactiveStream
* @className: ReactiveStreamDemo
* @author: 冷环渊 doomwatcher
* @description: TODO
* @date: 2021/12/14 0:24
* @version: 1.0
*/
public class ReactiveStreamDemo {
public static void main(String[] args) {
// 1.创建一个 发布者
SubmissionPublisher publisher = new SubmissionPublisher();
// 2.创建一个订阅者
Flow.Subscriber subscriber = new Flow.Subscriber() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("创建订阅关系 ");
subscription.request(1); //第一次需要发送一个 之后的都不需要了
}
@Override
public void onNext(Object item) {
System.out.println("接收数据:" item);
//接收数据 业务处理
subscription.request(10);
}
@Override
public void onError(Throwable throwable) {
System.out.println("发生错误了");
}
@Override
public void onComplete() {
System.out.println("数据发送完成了");
}
};
// 3 建立订阅者
publisher.subscribe(subscriber);
for (int i = 0; i < 100; i ) {
// 4 发送数据
publisher.submit("第" i "条hello reactive stream");
}
publisher.close();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
代码语言:javascript复制这里我们需要编写一个 Processor 来当做中间处理数据的 我们的发布者先发给Processor之后由Processor发给订阅者,
/**
* @projectName: Webflux_demo
* @package: reactiveStream
* @className: ReactiveProcessor
* @author: 冷环渊 doomwatcher
* @description: TODO
* @date: 2021/12/14 0:25
* @version: 1.0
*/
public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Processor建立订阅关系");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Processor接收数据:" item);
//中间处理
//数据发给最终订阅者
this.submit(item.toUpperCase());
//背压的实现核心
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("出现错误了");
}
@Override
public void onComplete() {
System.out.println("数据传输成功");
}
}
代码语言:javascript复制编写有中间处理器 Processor的demo
/**
* @projectName: Webflux_demo
* @package: reactiveStream
* @className: ReactiveStreamDemo2
* @author: 冷环渊 doomwatcher
* @description: TODO
* @date: 2021/12/14 0:25
* @version: 1.0
*/
public class ReactiveStreamDemo2 {
public static void main(String[] args) {
// 1.创建一个 发布者
SubmissionPublisher publisher = new SubmissionPublisher();
// 2.创建一个 Processor
ReactiveProcessor processor = new ReactiveProcessor();
// 3 发布者将消息给processor来做处理之后转发到最终订阅者
publisher.subscribe(processor);
// 4.创建一个最终订阅者
Flow.Subscriber subscriber = new Flow.Subscriber() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("创建订阅关系 ");
subscription.request(1); //第一次需要发送一个 之后的都不需要了
}
@Override
public void onNext(Object item) {
System.out.println("接收数据:" item);
//接收数据 业务处理
subscription.request(10);
}
@Override
public void onError(Throwable throwable) {
System.out.println("发生错误了");
}
@Override
public void onComplete() {
System.out.println("数据发送完成了");
}
};
processor.subscribe(subscriber);
for (int i = 0; i < 100; i ) {
System.out.println("发布数据" i);
// 4 发送数据
publisher.submit("第" i "条hello reactive stream");
}
publisher.close();
try {
Thread.currentThread().join(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
到这里我们基于ReactiveStream的小练习demo就到这里了
Reactor Project(spring)
Flux And Mono 他们都是 Publisher
Flux 0-N 项的异步序列 代表0-多个
AFlux
是一个标准Publisher
,表示 0 到 N 个发出的项目的异步序列,可选地由完成信号或错误终止。如无流规范,这三种类型的信号转换为呼叫到下游用户的onNext
,onComplete
和onError
方法。
具有这种大范围的可能信号,Flux
是通用的反应型。请注意,所有事件,即使是终止事件,都是可选的:没有onNext
事件但 onComplete
事件代表一个空的有限序列,但是删除onComplete
并且您有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列也不一定是空的。
Mono
, 异步 0-1 结果 要么有一个 要么没有
AMono
是一种特殊的Publisher
,它通过onNext
信号最多发出一个项目, 然后以一个onComplete
信号(成功Mono
,有或没有值)终止,或者只发出一个onError
信号(失败Mono
)。
可以使用 aMono
来表示只有完成概念的无值异步进程(类似于 a Runnable
)一个空的 Mono
.
Reactor Coding
Coding之前 我们先把Reactor 需要的Mavern依赖 导入到maven 环境里
代码语言:javascript复制 <dependency>
<groupId>io.projectreactorgroupId>
<artifactId>reactor-coreartifactId>
<version>3.4.6version>
dependency>
代码语言:javascript复制Mono
/**
* @author 冷环渊 Doomwatcher
* @context: 这里是 Mono 创建 0-1个元素序列的测试方法
* @date: 2021/12/14 15:01
* @param
* @return: void
*/
@Test
public void MonoTset() {
// 1. Mono 的创建方式
/*
*创建 空的 Mono 对象 输出 “”
* public final Disposable subscribe(Consumer consumer) {
* Objects.requireNonNull(consumer, "consumer");
* return this.subscribe(consumer, (Consumer)null, (Runnable)null);
* }
* 从源码看出 我们的 subsrcibe参数是 Consumer,也就是说只进 不出
* */
Mono.empty().subscribe(System.out::println);
/*
*创建一个 Mono 输出内容就是我们just()参数的内容
* public static Mono just(T data) {
* return onAssembly(new MonoJust(data));
* }
* */
Mono.just("我的今天就结束 webflux 的学习了 hello Mono").subscribe(System.out::println);
}
代码语言:javascript复制Flux
/**
* @author 冷环渊 Doomwatcher
* @context: 这里是 flux 创建多个 0-n个元素序列 测试方法
* @date: 2021/12/14 15:01
* @param
* @return: void
*/
@Test
public void FluxTset() {
// 创建一个Flux
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
System.out.println();
//创建多个 集合的形式
Flux.fromIterable(Arrays.asList("a1", "b1", "c1", "d1")).subscribe(System.out::print);
System.out.println();
//创建多个 数组的形式
Flux.fromArray(new String[]{"a1", "b1", "c1", "d1", "e1"}).subscribe(System.out::print);
System.out.println();
//基于流创建
Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).subscribe(System.out::print);
//通过饭未创建
System.out.println();
Flux.range(1, 100).subscribe(System.out::println);
/*
* 小案例
* Flux.generate这里我们以两个参数为例子
* 2的乘法口诀
* 2*0 = 0
* 2*1=1
* 2*2 = 4
* */
Flux.generate(() -> 0, (i, sink) -> {
sink.next("2*" i "=" 2 * i);
if (i == 9) {
sink.complete();
}
return i 1;
}).subscribe(System.out::println);
}
响应式编程需求实战
需求 我们这个需求的案例
给一定随机英文字符串,要求以26个字母的顺序输出排列
- 不能用循循环
- 不要以暴力的方式
解题思路
这里我们写了两种 解题目的方法,一个是基于StreamAPI 一个是基于ReactorAPI
- 思路是这个样子的,创建出一个去掉空格获得的字符数组,之后去重排序即可
/**
* @author 冷环渊 Doomwatcher
* @context: 响应式变成小练习
* 给一定随机英文字符串,要求以26个字母的顺序输出排列
* 小冷没看视频 用Stream流api 编写的
* @date: 2021/12/14 16:44
* @param
* @return: void
*/
@Test
public void StreamDemoTest() {
String[] arr = new String[]{"hello", "guys", "i", "prizev", "abc"};
List<String> list = Arrays.asList(arr);
list.stream()
.filter(i -> !i.isEmpty())
.flatMap(i -> Stream.of(i.split("")))
.distinct()
.sorted()
.forEach(System.out::print);
}
/**
* @author 冷环渊 Doomwatcher
* @context: 这个是根据视频 用 reactor flux api 编写的
* @date: 2021/12/14 16:54
* @param
* @return: void
*/
@Test
public void VedioReactorTest() {
String str = "hello guys i am bole welcome to normal school jdk quick fox prizev ";
Flux.fromArray(str.split(" "))
.flatMap(i -> Flux.fromArray(i.split("")))
.distinct()
.sort()
.subscribe(System.out::print);
}
WebFlux 响应式框架
Spring WebFlux
Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。响应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版本中添加的。它是完全非阻塞的,支持 Reactive Streams背压,并在 Netty、Undertow 和 Servlet 3.1 容器等服务器上运行。
这两个 Web 框架都反映了它们的源模块(spring-webmvc和 spring-webflux)的名称,并在 Spring 框架中并排共存。每个模块都是可选的。应用程序可以使用一个或另一个模块,或者在某些情况下,两者都使用——例如,带有响应式WebClient
.
为什么我们需要Webflux 1.我们需要少量的线程来支持更多的处理。Servlet 3.1 确实为非阻塞 I/O 提供了 API。然而,使用它会远离 Servlet API 的其余部分,其中契约是同步 (
Filter
,Servlet
) 或阻塞 (getParameter
,getPart
)。这就是将新的通用 API 用作任何非阻塞运行时的基础的动机。这很重要,因为服务器(例如 Netty)在异步、非阻塞空间中建立良好。 2 是函数式编程。就像 Java 5 中添加注释创造了机会(例如带注释的 REST 控制器或单元测试)一样,Java 8 中添加的 lambda 表达式为 Java 中的函数式 API 创造了机会。这对于允许异步逻辑的声明式组合的非阻塞应用程序和延续式 API(由CompletableFuture
和ReactiveX推广)是一个福音。在编程模型级别,Java 8 使 Spring WebFlux 能够提供功能性 Web 端点以及带注释的控制器。
Spring MVC和spring webflux 的技术场景使用图
Webflux的核心库就是我们的 Reactor API 与MVC区别所在
- 接收但是 Publisher 返回的是 Mono/Flux
- 同时支持注解和函数式编程两种模式
spring-web
模块包含以下对反应式 Web 应用程序的基础支持:
- 对于服务器请求处理,有两个级别的支持。
- HttpHandler:HTTP 请求处理的基本契约,具有非阻塞 I/O 和反应流背压,以及用于 Reactor Netty、Undertow、Tomcat、Jetty 和任何 Servlet 3.1 容器的适配器。
WebHandler
API:用于请求处理的稍高级别的通用 Web API,在其之上构建了具体的编程模型,例如带注释的控制器和功能端点。
- 对于客户端,有一个基本
ClientHttpConnector
合同来执行带有非阻塞 I/O 和响应式流背压的 HTTP 请求,以及用于Reactor Netty、响应式 Jetty HttpClient 和Apache HttpComponents 的适配器 。应用程序中使用的更高级别的WebClient建立在这个基本契约之上。 - 对于客户端和服务器,用于 HTTP 请求和响应内容的序列化和反序列化的编解码器。
理论就到这里,我们来上手实操吧!
WebFlux Coding
编写controller 注解 hello world
代码语言:javascript复制/**
* @projectName: webflux
* @package: com.hyc.webflux.Controller
* @className: ReactorController
* @author: 冷环渊 doomwatcher
* @description: TODO
* @date: 2021/12/14 19:27
* @version: 1.0
*/
@RestController
@RequestMapping("/annotated")
public class ReactorController {
@GetMapping("/greeting")
public Mono<String> greeting() {
return Mono.just(" hello webflux by annotated");
}
}
代码语言:javascript复制 . ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _
( ( )___ | '_ | '_| | '_ / _` |
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |___, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.2)
2021-12-14 19:35:53.017 INFO 15172 --- [ main] com.hyc.webflux.WebfluxApplication : Starting WebfluxApplication using Java 11.0.2 on DESKTOP-OG41IMR with PID 15172 (D:JavaEngineerSpirng5Webfluxwebfluxtargetclasses started by doomwstcher in D:JavaEngineerSpirng5Webfluxwebflux)
2021-12-14 19:35:53.022 INFO 15172 --- [ main] com.hyc.webflux.WebfluxApplication : No active profile set, falling back to default profiles: default
2021-12-14 19:35:54.094 INFO 15172 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
2021-12-14 19:35:54.104 INFO 15172 --- [ main] com.hyc.webflux.WebfluxApplication : Started WebfluxApplication in 1.501 seconds (JVM running for 2.712)
这里我们查看
这就是我们注解版本的helloworld
代码语言:javascript复制函数式 hello world
//函数式
@Bean
public RouterFunction<ServerResponse> routers() {
return RouterFunctions.route().GET("/func/greeting", serverRequest -> ok().bodyValue("hello webflux by function")).build();
}
结语
这篇文章主要是帮助 想要了解 spring 最新技术特性的小伙伴进行一个 简单的入门,
想要了解更多,可以通过文档,视频等继续深入学习,工程师的路上 学无止境