RxJava源码浅析(三): subscribeOn线程切换和多次切换

2022-03-20 12:59:16 浏览数 (2)

一、subscribeOn

这篇不仅看下subscribeOn线程切换本身,我们还要研究下多次subscribeOn为啥只有第一次有效。

代码语言:javascript复制
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        Log.e("subscribe",Thread.currentThread().getId() "");
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();

    }
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Disposable dd=d;

        Log.e("onSubscribe",Thread.currentThread().getId() "");
    }
    @Override
    public void onNext(Integer integer) {
        Log.e("onNext",Thread.currentThread().getId() "--" integer "");
    }
    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
};

//关联上游和下游
myobservable.subscribeOn(Schedulers.newThread()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(myobserver);

先看下Schedulers.newThread()这个到底是做了什么。

通过查看,我们得知Schedulers.newThread()最终创建了NewThreadScheduler类,看名称和newThread很对应。

(顺便说下,如果切换其他线程,比如subscribeOn(Schedulers.io()),那他最终创建的是IoScheduler类,是不是很好记。)

NewThreadScheduler这个类先放在这,记住就好。

接下来我们看下subscribeOn方法,这个方法最终是创建了ObservableSubscribeOn类,他继承了Observable。

代码语言:javascript复制
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;
        final AtomicReference<Disposable> upstream;
        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }
        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        ..........
        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

有了前面文章的基础,我们应该清楚基本流程了。我们调用subscribe(myobserver)方法,实际调用ObservableSubscribeOn的subscribeActual方法,里面创建了SubscribeOnObserver,包装了myobserver。我们主要看下scheduleDirect方法,看下源码,最终调用的是接口Schedule的scheduleDirect:

代码语言:javascript复制
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

上面我们知道,当前Schedule的具体实现类是NewThreadScheduler,createWorker方法具体实现在NewThreadScheduler中,我们看下createWorker

代码语言:javascript复制
public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}
代码语言:javascript复制
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

createWorker是创建了一个线程池。当前的Runnable是谁?慢慢调试,我们发现是SubscribeTask【我们还发现有一个DisposeTask,这个包装了SubscribeTask,添加了控制方法,控制了SubscribeTask生命周期。具体可以看下DisposeTask类,这里就不细看了】,其run方法执行了source.subscribe(parent);这句话,有了千年两篇文章的基础,我们知道他直接执行了

此时的myobserver是SubscribeOnObserver,接下来我们应该都知道发生了什么。但是在哪个线程执行的呢?我们知道scheduler.scheduleDirect(new SubscribeTask(parent))这句就是创建了一个线程池,里面只有一个线程,执行了SubscribeTask这个Runnable,这个线程中执行了source.subscribe(parent);

所以myobservable.subscribeOn(Schedulers.newThread()).subscribe(myobserver);这句是myobservable和myobserver都在新线程中运行

上面截图我们知道,onSubscribe不在新线程中执行。

我们简单总结下subscribeOn(Schedulers.newThread()),就是在创建新线程中执行订阅分发。

二、多次subscribeOn

我们来个调皮的操作,我们现在多次调用subscribeOn

代码语言:javascript复制
myobservable.subscribeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.newThread()).subscribe(myobserver);

那你说订阅在哪个线程?主线程还是新线程?

没事,我们来看下源码,主要来看ObservableSubscribeOn

我们知道,subscribeOn这个操作符就是将上一层的ObservableSource(就是上一层的Observable)放到一个新的线程去发射元素。上面执行了两次subscribeOn,第一次会把订阅放在新线程中,第二次会把订阅放在主线程中,最终订阅是在主线程中执行。

这里我们先得出一个结论,多次subscribeOn,以第一个subscribeOn为准。

我们现在知道RxJava是逆向向上调用的,那我们就一步一步的调代码看看。

第一次.subscribe(myobserver)的时候

第一个上游

上游是subscribeOn(Schedulers.newThread()),直接看ObservableSubscribeOn的subscribeActual方法

代码语言:javascript复制
@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

此时的observer是myobserver,observer.onSubscribe(parent);直接调用myobserver的onSubscribe实现

然后我们知道subscribeOn(Schedulers.newThread()) 是在新线程中执行一个runnble,也就是SubscribeTask。这个里面会调用source.subscribe(parent);,也就是上游的subscribe,此时上游是subscribeOn(AndroidSchedulers.mainThread()),

第二个上游

现在开始走第二个上游,他也是ObservableSubscribeOn。 同样,我们也重点看subscribeActual方法。

此时的observer是下游,也就是subscribeOn(Schedulers.newThread())创建的SubscribeOnObserver(命名为AObserver),那observer.onSubscribe(parent);直接调用AObserver的onSubscribe实现,但是此时SubscribeOnObserver中的onSubscribe

有具体实现,那就执行它。

代码语言:javascript复制
@Override
public void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this.upstream, d);
}

同样,我们知道subscribeOn(AndroidSchedulers.mainThread()) 是在主线程中执行一个runnble,也就是SubscribeTask。这个里面会调用source.subscribe(parent);,此时上游source是ObservableCreate,那就是ObservableCreate在AndroidSchedulers.mainThread()线程中执行任务,有了前篇讲解,我们以已经了解了Rxjava基础订阅流程,知道了ObservableCreate如何执行任务,只不过我们现在是在指定线程中执行。

最上流ObservableCreate

那此时的Observer是谁?是myobserver吗?不是!是subscribeOn(AndroidSchedulers.mainThread())中创建的SubscribeOnObserver(命名为BObserver)。

代码语言:javascript复制
public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

此时的observer是BObserver,observer.onSubscribe(parent);也就是调用

代码语言:javascript复制
@Override
public void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this.upstream, d);
}

source.subscribe(parent);就是调用ObservableOnSubscribe的具体实现。执行ObservableOnSubscribe中的onNext方法,继而调用BObserver的onNext方法。BObserver就是SubscribeOnObserver中的SubscribeOnObserver中的myobserver结构。

三、总结

对于OnSubscribe方法而言,不管subscribeOn怎么切换线程,他都不受影响,他是最先开始执行的且只执行一次,只针对最下游有效,对于订阅而言,线程切换只是改变当前observer的所属线程,最后一个更改才算数(写法是第一个,执行流程是最后一个)。

这篇文章一些结论属于个人猜想,有说的不对的我及时纠正。

0 人点赞