mono subscribe源码分析

2022-03-28 20:17:38 浏览数 (4)

https://blog.csdn.net/john1337/article/details/101028908这篇文章已经对整个project reactor工作流程做了概述,本文对mono的subscribe源码进行更细的描述:

本文将以实际代码来分析下project reactor常见的subscribe工作原理

代码语言:javascript复制
        Mono.just("hello")
         .filter(t->t.startsWith("h"))
         .map(String::toUpperCase)
         .subscribe(System.out::println);

上面是一个简单的反应式编程的代码,mono.just创建数据源,然后经过filter经过过滤处理,然后经过map进行处理,熟悉jdk stream的对map这个操作一定不会陌生,map及其以前的操作仅仅是创建了一个publisher,上面仅仅是声明阶段,并没有产生实际效果,只有经过了subscribe之后才开始工作,下面就用上面的代码来分析下整个工作流程。

代码语言:javascript复制
    /***
    **该方法会根据声明部分创建完整发布、订阅关系链
    *本例子中涉及到下面几个订阅者类:LambdaMonoSubscriber、MapFuseableSubscriber以及 
    *FilterFuseableSubscriber
    ***/
	public final void subscribe(Subscriber<? super T> actual) {
		CorePublisher publisher = Operators.onLastAssembly(this);
		CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

        //本例中的filter、map节点都是实现了OptimizableOperator
		if (publisher instanceof OptimizableOperator) {
			OptimizableOperator operator = (OptimizableOperator) publisher;
            //下面创建发布、订阅链,创建过程跟声明阶段相比,即map-->filter
			while (true) {
				subscriber = operator.subscribeOrReturn(subscriber);
				if (subscriber == null) {
					// null means "I will subscribe myself", returning...
					return;
				}
                
                //map上一节点为filter,filter不是数据源,filter节点的上一节点
                //为MonoJust,为数据源,所以filter节点nextOptimizableSource返回null
				OptimizableOperator newSource = operator.nextOptimizableSource();
				if (newSource == null) {
                    //最后在这里更新发布者为最终的发布者,例如该例子中的MonoJust
					publisher = operator.source();
					break;
				}
				operator = newSource;
			}
		}
        //对于本文来说,publisher为MonoJust对象,subscriber为FilterFuseableSubscriber
        //MonoJust对象调用subscribe方法会调用其subscriber(即FilterFuseableSubscriber)的onSubscribe方法,然后进入onSubscribe阶段,onSubscribe调用顺序跟声明阶段相同,当onSubscribe传到subscribe方法的订阅者时将进入request阶段,request阶段执行顺序跟声明阶段相反,当request阶段执行到数据源端又会触发调用阶段的执行,常见的为subscriber.onNext方法
		publisher.subscribe(subscriber);
	}

1 人点赞