RxJava 中observeOn()与subscribeOn()

2020-11-23 11:34:39 浏览数 (2)

放松吧

还是那个味,冬至日来首音乐, 听说吃饺子,听音乐和技术更配哦!

RxJava

observeOn和subscribeOn都是对observable的一种操作,区别就是subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的schedule,而obseveron仅仅是改变了对发出事件后相关处理事件的程序所在的schedule。

或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。

两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的schedule,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。

subscribeOn()和observeOn()的区别

  • subscribeOn()主要改变的是订阅的线程,即call()执行的线程;
  • observeOn()主要改变的是发送的线程,即onNext()执行的线程。

subscribeOn

我们先看一个例子。

代码语言:javascript复制

运行如下:

代码语言:javascript复制

我们看一下subscribeOn()中,都干了什么

代码语言:javascript复制

很明显,会走if之外的方法。

在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法后可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。

这里做个总结。

把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。 那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。

调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。

下面分析下call()方法。

  • inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。
  • Subscriber被包装了一层。
  • source.unsafeSubscribe(s);,注意source是Observable_1对象。

unsafeSubscribe方法代码:

代码语言:javascript复制

代码很多,关键代码:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);

该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。

对于以上线程,我们可以总结,其有如下流程:

  • Observable.create() : 创建了Observable_1和OnSubscribe_1;
  • subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。
  • observable_2.subscribe(Observer):
    • 调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);
    • Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。 从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。

那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序

代码语言:javascript复制

那么,我们根据之前的源码分析其执行逻辑。

  • Observable.just(“ss”),创建Observable_1,OnSubscribe_1
  • Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
  • observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
  • Observable_3.subscribe():
    • 调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
    • 调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
    • 调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。 根据以上逻辑分析,会按照1的线程进行执行。

subscribeOn如何工作,关键代码其实就是一行代码:

代码语言:javascript复制

注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn的调用,改变了调用前序列所运行的线程。

observeOn

看一下observeOn()源码:

代码语言:javascript复制

这里引出了新的操作符lift

代码语言:javascript复制

这里不再介绍了,详见:http://blog.csdn.net/jdsjlzx/article/details/51686152

在lift()中,有如下关键代码:

代码语言:javascript复制

OperatorObserveOn.call()核心代码:

代码语言:javascript复制

我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。

我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程

代码语言:javascript复制

这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

我们这里需要注意下:

在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。

代码语言:javascript复制

recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()。

scheduler中调用的call()方法

代码语言:javascript复制

call()中有localChild.onNext(localOn.getValue(v));调用。

在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

总结起来的结论就是:

  1. observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
  2. observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

observeOn改变的是onNext()调用

subcribeOn和observeOn 对比分析

代码语言:javascript复制

有如上逻辑,则我们对其运行进行分析。

首先,我们需要先明白其内部执行的逻辑。

在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。

其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()实在onNext()中作用。

那么对于以上的逻辑,我们可以得出如下结论:

  • 操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
  • 操作5,6在main线程中运行。因为observeOn()改变了onNext().
  • 特别注意那一个逻辑没起到作用

再简单点总结就是

  1. subscribeOn的调用切换之前的线程。
  2. observeOn的调用切换之后的线程。
  3. observeOn之后,不可再调用subscribeOn 切换线程

复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

@扔物线 大神给的总结:

  1. 下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
  2. 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 无意义);
  3. 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
  4. observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
  5. 不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。

参考文章: https://segmentfault.com/a/1190000004856071 http://blog.csdn.net/lisdye2/article/details/51113837

这个(学习总结)博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。

---我是分割线---

Tamic开发社区

非专业的移动社区

不只是干货,还有人生

长按二维码关注我们

0 人点赞