一、subscribeOn
这篇不仅看下subscribeOn线程切换本身,我们还要研究下多次subscribeOn为啥只有第一次有效。
代码语言:javascript复制//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
Log.e("subscribe",Thread.currentThread().getId() "");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Disposable dd=d;
Log.e("onSubscribe",Thread.currentThread().getId() "");
}
@Override
public void onNext(Integer integer) {
Log.e("onNext",Thread.currentThread().getId() "--" integer "");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
//关联上游和下游
myobservable.subscribeOn(Schedulers.newThread()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(myobserver);
先看下Schedulers.newThread()这个到底是做了什么。
通过查看,我们得知Schedulers.newThread()最终创建了NewThreadScheduler类,看名称和newThread很对应。
(顺便说下,如果切换其他线程,比如subscribeOn(Schedulers.io()),那他最终创建的是IoScheduler类,是不是很好记。)
NewThreadScheduler这个类先放在这,记住就好。
接下来我们看下subscribeOn方法,这个方法最终是创建了ObservableSubscribeOn类,他继承了Observable。
代码语言:javascript复制public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
..........
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
有了前面文章的基础,我们应该清楚基本流程了。我们调用subscribe(myobserver)方法,实际调用ObservableSubscribeOn的subscribeActual方法,里面创建了SubscribeOnObserver,包装了myobserver。我们主要看下scheduleDirect方法,看下源码,最终调用的是接口Schedule的scheduleDirect:
代码语言:javascript复制public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
上面我们知道,当前Schedule的具体实现类是NewThreadScheduler,createWorker方法具体实现在NewThreadScheduler中,我们看下createWorker
代码语言:javascript复制public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
代码语言:javascript复制public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
createWorker是创建了一个线程池。当前的Runnable是谁?慢慢调试,我们发现是SubscribeTask【我们还发现有一个DisposeTask,这个包装了SubscribeTask,添加了控制方法,控制了SubscribeTask生命周期。具体可以看下DisposeTask类,这里就不细看了】,其run方法执行了source.subscribe(parent);这句话,有了千年两篇文章的基础,我们知道他直接执行了
此时的myobserver是SubscribeOnObserver,接下来我们应该都知道发生了什么。但是在哪个线程执行的呢?我们知道scheduler.scheduleDirect(new SubscribeTask(parent))这句就是创建了一个线程池,里面只有一个线程,执行了SubscribeTask这个Runnable,这个线程中执行了source.subscribe(parent);
所以myobservable.subscribeOn(Schedulers.newThread()).subscribe(myobserver);这句是myobservable和myobserver都在新线程中运行
上面截图我们知道,onSubscribe不在新线程中执行。
我们简单总结下subscribeOn(Schedulers.newThread()),就是在创建新线程中执行订阅分发。
二、多次subscribeOn
我们来个调皮的操作,我们现在多次调用subscribeOn
代码语言:javascript复制myobservable.subscribeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.newThread()).subscribe(myobserver);
那你说订阅在哪个线程?主线程还是新线程?
没事,我们来看下源码,主要来看ObservableSubscribeOn
我们知道,subscribeOn这个操作符就是将上一层的ObservableSource(就是上一层的Observable)放到一个新的线程去发射元素。上面执行了两次subscribeOn,第一次会把订阅放在新线程中,第二次会把订阅放在主线程中,最终订阅是在主线程中执行。
这里我们先得出一个结论,多次subscribeOn,以第一个subscribeOn为准。
我们现在知道RxJava是逆向向上调用的,那我们就一步一步的调代码看看。
第一次.subscribe(myobserver)的时候,
第一个上游
上游是subscribeOn(Schedulers.newThread()),直接看ObservableSubscribeOn的subscribeActual方法
代码语言:javascript复制@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
此时的observer是myobserver,observer.onSubscribe(parent);直接调用myobserver的onSubscribe实现
然后我们知道subscribeOn(Schedulers.newThread()) 是在新线程中执行一个runnble,也就是SubscribeTask。这个里面会调用source.subscribe(parent);,也就是上游的subscribe,此时上游是subscribeOn(AndroidSchedulers.mainThread()),
第二个上游
现在开始走第二个上游,他也是ObservableSubscribeOn。 同样,我们也重点看subscribeActual方法。
此时的observer是下游,也就是subscribeOn(Schedulers.newThread())创建的SubscribeOnObserver(命名为AObserver),那observer.onSubscribe(parent);直接调用AObserver的onSubscribe实现,但是此时SubscribeOnObserver中的onSubscribe
有具体实现,那就执行它。
代码语言:javascript复制@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
同样,我们知道subscribeOn(AndroidSchedulers.mainThread()) 是在主线程中执行一个runnble,也就是SubscribeTask。这个里面会调用source.subscribe(parent);,此时上游source是ObservableCreate,那就是ObservableCreate在AndroidSchedulers.mainThread()线程中执行任务,有了前篇讲解,我们以已经了解了Rxjava基础订阅流程,知道了ObservableCreate如何执行任务,只不过我们现在是在指定线程中执行。
最上流ObservableCreate
那此时的Observer是谁?是myobserver吗?不是!是subscribeOn(AndroidSchedulers.mainThread())中创建的SubscribeOnObserver(命名为BObserver)。
代码语言:javascript复制public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
此时的observer是BObserver,observer.onSubscribe(parent);也就是调用
代码语言:javascript复制@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
source.subscribe(parent);就是调用ObservableOnSubscribe的具体实现。执行ObservableOnSubscribe中的onNext方法,继而调用BObserver的onNext方法。BObserver就是SubscribeOnObserver中的SubscribeOnObserver中的myobserver结构。
三、总结
对于OnSubscribe方法而言,不管subscribeOn怎么切换线程,他都不受影响,他是最先开始执行的且只执行一次,只针对最下游有效,对于订阅而言,线程切换只是改变当前observer的所属线程,最后一个更改才算数(写法是第一个,执行流程是最后一个)。
这篇文章一些结论属于个人猜想,有说的不对的我及时纠正。