Mono的使用

2024-08-01 22:59:42 浏览数 (1)

Mono的使用

一、介绍

最近在看gateway,发现里面是响应式编程,一看里面的代码

发现了Mono的使用,以前怎么没有注意,一下子看到还真的不认识

那么简单看看这是一个什么类,有什么用

Java中,Mono 类是Spring Reactor框架中的一个核心组件,它是Reactive Streams规范的一个实现,主要用于处理包含零个或一个元素的异步序列。Mono可以代表未来某个时刻可能出现的单一值,或者表示没有值(即空值)。这种类型的反应式类型非常适合那些你期望返回单个结果(比如查询数据库得到的单个实体)的情况。

简单的来说,类似与Optional的一个包装类,对一个对象进行包装,然后进行处理

那直接来看看,如何进行使用

二、使用

1)初解使用

代码语言:java复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void beginner() {
        // 生成一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        Mono<String> empty = Mono.empty();
        // 订阅使用
        helloWorld.subscribe(System.out::println);
        empty.subscribe(System.out::println);
    }
​
}

2)异步源

代码语言:java复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
import java.util.concurrent.CompletableFuture;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void async() {
        // 还可以使用这种异步源创建Mono包装对象
        Mono<String> fromCallable = Mono.fromCallable(() -> "hello world");
        Mono<String> fromCompletionStage = Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> "hello world"));
        Mono<String> fromFuture = Mono.fromFuture(CompletableFuture.completedFuture("hello world"));
        Mono<String> create = Mono.create(monoSink -> monoSink.success("hello world"));
        // 订阅使用
        fromCallable.subscribe(System.out::println);
        fromCompletionStage.subscribe(System.out::println);
        fromFuture.subscribe(System.out::println);
        create.subscribe(System.out::println);
    }
​
}

3)其他订阅处理

代码语言:java复制
package com.banmoon.mono;
​
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void otherSubscribe() {
        // 生成一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        Mono<?> banmoonException = Mono.error(new BanmoonException("自定义异常"));
        Mono<String> subscribeMono = Mono.just("subscribe");
        // 订阅使用
        helloWorld.subscribe(System.out::println, System.err::println, () -> System.out.println("complete"));
        banmoonException.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("complete"));
        // 使用Subscriber入参,高度自定义订阅;通常情况下,我们使用上面的重载方法即可
        subscribeMono.subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                // 当订阅开始时,请求最大数量的数据
                subscription.request(Integer.MAX_VALUE);
            }
​
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
​
            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }
​
            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });
    }
​
}

4)map映射转换

代码语言:java复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void mapConvert() {
        // 一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        // map转换
        helloWorld = helloWorld.map(String::toUpperCase);
        // 订阅输出
        helloWorld.subscribe(System.out::println);
​
        // 一个Mono包装对象
        Mono<String> helloWorld1 = Mono.just("hello world");
        // flatMap转换
        helloWorld1 = helloWorld1.flatMap(s -> Mono.just(s.toUpperCase()));
        // 订阅输出
        helloWorld1.subscribe(System.out::println);
    }
​
}

5)filter过滤

代码语言:java复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void filter() {
        // 一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        // filter
        Mono<String> banmoon = helloWorld.filter(s -> s.contains("banmoon"));
        // 订阅输出
        banmoon.subscribe(System.out::println);
    }
​
}

6)异常的处理

代码语言:java复制
package com.banmoon.mono;
​
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import reactor.core.publisher.Mono;
​
import java.security.InvalidParameterException;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void error() {
        // 模拟在处理过程中,可能会出现异常
        Mono<String> fromCallable = Mono.fromCallable(() -> {
            String str = "hello world";
            throw new BanmoonException("自定义异常");
        });
        // 可以在订阅前面,提前处理这个异常,异常处理,提供一个Mono包装对象
        Mono<String> fromCallable1 = fromCallable.onErrorResume(throwable -> Mono.just(throwable.getMessage()));
        fromCallable1.subscribe(System.out::println);
​
        // 异常处理,提供一个值
        Mono<String> fromCallable2 = fromCallable.onErrorReturn(throwable -> throwable instanceof BanmoonException, "banmoon自定义异常");
        fromCallable2.subscribe(System.out::println);
​
        // 异常处理,将error转换成另一个
        Mono<String> fromCallable3 = fromCallable.onErrorMap(throwable -> new InvalidParameterException("自定义异常"));
        fromCallable3.subscribe(System.err::println);
​
        // 异常处理,对异常进行处理,没有返回值,还是原本的fromCallable
        Mono<String> fromCallable4 = fromCallable.doOnError(throwable -> System.err.println(throwable.getMessage()));
        fromCallable4.subscribe();
    }
​
}

7)延迟

代码语言:java复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
import java.time.Duration;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void delay() {
        // 创建一个包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        // 延迟3秒
        Mono<String> delayElement = helloWorld.delayElement(Duration.ofSeconds(3));
        // 订阅输出
        delayElement.subscribe(System.out::println);
        // 由于 Mono 是非阻塞的,为了确保主线程等待 Mono 完成,
        // 我们需要在这里阻塞主线程,否则程序会立即退出
        // 注意:在实际应用中,你通常不需要这样做,因为 Mono 通常是在事件循环或异步上下文中使用的
        delayElement.block();
​
        // 另一种方式
        Mono<String> helloWorld1 = Mono.just("hello world").delaySubscription(Duration.ofSeconds(3));
        helloWorld1.subscribe(System.out::println);
        // 避免退出
        helloWorld1.block();
    }
​
}

8)多对象包装Flux

代码语言:java复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void toFluxOrMono() {
        // 创建一个包装对象
        Mono<String> helloWorld = Mono.just("Hello World");
        // 转换为Flux包装对象
        Flux<String> flux = helloWorld.flux();
        // 订阅输出
        flux.subscribe(System.out::println);
​
        // 创建一个Flux包装对象
        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5);
        // 转换为Mono对象
        Mono<Integer> integerMono = integerFlux.next();
        Mono<Integer> integerMono1 = integerFlux.last();
        // 订阅输出
        integerMono.subscribe(System.out::println);
        integerMono1.subscribe(System.out::println);
    }
​
}

9)链式调用

代码语言:java复制
package com.banmoon.mono;
​
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void chain() {
        // 链式调用
        Flux.just(1, 2, 3, 4, 5)
                .filter(i -> i > 3)
                .mapNotNull(i -> i * 2)
                .next()
                .subscribe(System.out::println);
        // 链式调用,正常情况
        Mono.just(1)
                .subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()), () -> System.out.println("完成"));
        // 链式调用,异常情况
        Mono.fromCallable(() -> {
            throw new BanmoonException("异常");
        }).subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()));
    }
​
}

三、最后

MonoFlux这都是响应式中必会的,不然你都看不懂写的啥,多看看就行

0 人点赞