RxJava源码浅析(四): observeOn线程切换和多次切换

2022-03-20 18:49:41 浏览数 (2)

上篇文章RxJava源码浅析(三): subscribeOn线程切换和多次切换 我们清楚了subscribeOn线程切换,对于Rxjava线程切换原理有了大致的理解。subscribeOn线程切换,是整个订阅流程线程切换,而observeOn只是针对下游线程切换。

这篇我们来看下observeOn切换线程以及他多次切换的影响。

一、observeOn

先来个demo

代码语言:javascript复制
 //上游-被观察者
    Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
            Log.e("subscribe",Thread.currentThread().getName() "");
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();

        }
    });
    //下游-观察者
    Observer myobserver=new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Disposable dd=d;

            Log.e("onSubscribe",Thread.currentThread().getName() "");
        }
        @Override
        public void onNext(Integer integer) {
            Log.e("onNext",Thread.currentThread().getName() "--" integer "");
        }
        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    };
    //关联上游和下游
    myobservable.observeOn(Schedulers.newThread()).subscribe(myobserver);

有了前面文章的基础,Schedulers.newThread()是创建了一个线程池,我们直接看observeOn。经过不断跳转我们知道这个方法最终是创建了ObservableObserveOn这个类,也是个Observable。我们直接来看subscribeActual吧。

代码语言:javascript复制
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
    }
}

scheduler.createWorker()就是创建了一个线程,当前是Schedulers.newThread(),接着调用source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize));这句话我们读出信息:订阅流程的上游没有切换线程,下游ObserveOnObserver切换了线程。

继续看下ObserveOnObserver源码

我们主要看下onNext方法,他调用了schedule,继续调用了worker.schedule(this);启动线程任务。此时的this就是当前ObserveOnObserver,他是个Runnable,那我们就直接看他的run方法。

代码语言:javascript复制
@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

继续看drainNormal();

代码语言:javascript复制
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

这里主要两个方法a.onNext(v)这个a,就是下游myobserver。所以我们知道下游的onNext在新线程中执行。

我们发现ObserveOnObserver中的onComplete、onError也是调用schedule();他主要是看checkTerminated,这个方法根据error、done来处理调用onComplete还是onError,还是其他。

observeOn就讲这么多了,有了前几篇文章,我们就很快能理解observeOn是怎么做的了。

总结下,observeOn就是把下游切换线程,相比subscribeOn好理解些。

二、多次observeOn

如果我们多次调用observeOn呢?是以哪个为准呢?假如两次调用observeOn,第一次是线程1,第二次是线程2。

上游

一开始,调用subscribe(myobserver)会调用上游ObservableObserveOn中的subscribeActual方法,进行订阅,创建新线程2,创建ObserveOnObserver(命名为AObserver),AObserver onNext、onComplete等运行在线程2中。

上上游

调用source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize));则调用上上游。同样,会调用ObservableObserveOn中的subscribeActual方法,进行订阅,创建新线程1,创建ObserveOnObserver(命名为BObserver,此时BObserver中的downstream是下游的ObserveOnObserver,也就是AObserver),AObserver onNext、onComplete等运行在线程1中。

最上游

在上上游中调用source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize));会调用ObservableCreate

中的subscribeActual方法,此时的observer是谁?对是BObserver,ObservableCreate的源头下发消息执行onNext的时候会调用BObserver的onNext方法。源码我们看过,BObserver中的onNext会开启新线程执行他的onNext方法,同时我们也发现,这个时候onNext方法会调用下游也就是downstream的onNext,也就是AObserver的onNext,同样AObserver的onNext也会调用myobserver的onNext,最终是执行了最后一次observeOn对应线程中的myobserver的onNext方法。

所有,不管多少次observeOn,都是调用最后一次observeOn。

我们发现RxJava是逆向向上调用的,然后不断向下一级一级的下发消息,最后一个observer来处理消息。

0 人点赞