https://blog.csdn.net/john1337/article/details/101028908这篇文章已经对整个project reactor工作流程做了概述,本文对mono的subscribe源码进行更细的描述:
本文将以实际代码来分析下project reactor常见的subscribe工作原理
代码语言:javascript复制 Mono.just("hello")
.filter(t->t.startsWith("h"))
.map(String::toUpperCase)
.subscribe(System.out::println);
上面是一个简单的反应式编程的代码,mono.just创建数据源,然后经过filter经过过滤处理,然后经过map进行处理,熟悉jdk stream的对map这个操作一定不会陌生,map及其以前的操作仅仅是创建了一个publisher,上面仅仅是声明阶段,并没有产生实际效果,只有经过了subscribe之后才开始工作,下面就用上面的代码来分析下整个工作流程。
代码语言:javascript复制 /***
**该方法会根据声明部分创建完整发布、订阅关系链
*本例子中涉及到下面几个订阅者类:LambdaMonoSubscriber、MapFuseableSubscriber以及
*FilterFuseableSubscriber
***/
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
//本例中的filter、map节点都是实现了OptimizableOperator
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
//下面创建发布、订阅链,创建过程跟声明阶段相比,即map-->filter
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
//map上一节点为filter,filter不是数据源,filter节点的上一节点
//为MonoJust,为数据源,所以filter节点nextOptimizableSource返回null
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
//最后在这里更新发布者为最终的发布者,例如该例子中的MonoJust
publisher = operator.source();
break;
}
operator = newSource;
}
}
//对于本文来说,publisher为MonoJust对象,subscriber为FilterFuseableSubscriber
//MonoJust对象调用subscribe方法会调用其subscriber(即FilterFuseableSubscriber)的onSubscribe方法,然后进入onSubscribe阶段,onSubscribe调用顺序跟声明阶段相同,当onSubscribe传到subscribe方法的订阅者时将进入request阶段,request阶段执行顺序跟声明阶段相反,当request阶段执行到数据源端又会触发调用阶段的执行,常见的为subscriber.onNext方法
publisher.subscribe(subscriber);
}