前言
今天开始聊聊Rxjava,这个神奇又难用又牛逼的框架。
先说说Rxjava
两个关键词:
异步
。Rxjava可以通过链式调用随意切换线程,同时又能保证代码的简洁。观察者模式
。Rxjava的核心,说白了就是一个观察者模式,通过观察者订阅被观察者这一层订阅关系来完成后续事件的发送等工作。
然后开始提问题了,Rxjava
涉及的内容很多,我还是会以三个问题为单位,从易到难,一篇篇的说下去,今天的三问
是:
- RxJava的订阅关系
- Observer处理完onComplete后会还能onNext吗?
- RxJava中的操作符
RxJava的订阅关系
代码语言:javascript复制Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " integer);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
代码中主要有三个角色:
被订阅者Observable
,是整个事件的来源,可以发射数据给订阅者。订阅者Observer
,通过subscribe方法和被订阅者产生关系,也就是开始订阅,同时可以接受被订阅者发送的消息。发射器Subscriber/Emitter
,在Rxjava2之后,发射器改为了Emitter,他的作用主要是用来发射一系列事件的,比如next事件,complete事件等等。
有了这三个角色,一个完整的订阅关系也就生成了。
Observer处理完onComplete后会还能onNext吗?
要弄清楚这个问题,得去看看onComplete,onNext方法到底做了什么。
代码语言:javascript复制@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
源码还是比较清晰明了,无论是onComplete
还是onNext
,都会判断当前订阅是否被取消,也就是Disposable类型的变量的引用是否等于DISPOSED
,如果等于则代表该订阅已经被取消,起点和终点已经断开联系。而在onComplete方法的结尾调用了dispose
方法,将原子引用类中的 Disposable
对象设置为 DisposableHelper 内的 DISPOSED
枚举实例,即断开订阅关系,所以在这之后所有的onNext,onComplete,onError
方法中的isDisposed判断都不会通过,也就不会执行后续的数据发送等处理了。
RxJava中的操作符
- concatMap
- flatMap
这两个操作符的功能是一样的,都是将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。区别在于concatMap
是有序的,flatMap
是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。
举个例子,发送数字01234,通过操作符对他们进行 1处理,发送2的时候进行一个延时:
代码语言:javascript复制Observable.fromArray(1,2,3,4,5)
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
int delay = 0;
if(integer == 2){
delay = 500;//延迟发射
}
return Observable.just(integer*10).delay(delay, TimeUnit.MILLISECONDS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e("jimu","accept:" integer);
}
});
如上述操作,最终打印结果为:10,20,40,50,30。因为发送数字2的时候,进行了延时。
但是如果flatMap
操作符改成concatMap
,打印结果就是10,20,30,40,50了,这是因为concatMap是有序的,会按照原序列的顺序进行变换输出。
- merge、concat、zip,合并
这几个操作符是用作合并发射物的,可以将多个Obserable
和并成一个Obserable
:
Observable<Integer> odds=Observable.just(1,2,3,4);
Observable<Integer> events=Observable.just(5,6,7,8);
Observable.merge(odds,events).subscribe(i->Log.d("TAG","merge->" i));
区别在于concat
操作符是在合并后按顺序串行执行,merge
操作符是在合并后按时间线并行执行,如果出现某个数据进行延时发射,那么结果序列就会发生变化。
而zip
操作符的特点是合并之后并行执行,发射事件和最少的一个相同,什么意思呢?比如一个发送两个数据的Obserable
和一个发射4条数据的Obserable
进行zip合并,那么最终只会有两条数据被发射出来,看个例子:
Observable
.zip(Observable.just(1,2),Observable.just(3,4,5,6),new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer response, @NonNull Integer response2) throws Exception {
//将两个发射器的结果相加
return response response2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
Log.e("lz","accept " s);
}
});
结果只会有两条数据:4,6。第二个发射器发射的后面两条数据会被抛弃。
- interval,周期执行
这个操作符主要用作定时周期任务,比如我需要每100ms发送一次数据:
代码语言:javascript复制Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
}
});
- timer,delay延迟发送数据
这两个操作符都是用作延时发送数据,不同在于timer
是创建型操作符,而delay
是辅助型操作符。意思就是timer
操作符是可以直接创建一个Observable
,然后在订阅之后延时发送数据项,看例子:
Observable
.timer(1000,TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.subscribe(disposableObserver);
而delay
是当原始的Observable
发送数据后,启动一个定时器,然后延时将这个数据发送,所以它相当于是处在上游与下游之间的一个辅助项,用作延时发送,它的作用对象必须是个创建好的Observable
:
Observable
.just(0L)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
}
.timer(1000,TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.subscribe(disposableObserver);
最后
最后为了帮助大家深刻理解Android相关知识点的原理以及面试相关知识,这里放上相关的我搜集整理的24套腾讯、字节跳动、阿里、百度2019-2020BAT 面试真题解析,我把大厂面试中常被问到的技术点整理成了视频和PDF(实际上比预期多花了不少精力),包知识脉络 诸多细节。
还有 高级架构技术进阶脑图 帮助大家学习提升进阶,也节省大家在网上搜索资料的时间来学习,也可以分享给身边好友一起学习。
以上内容均放在了开源项目:【github】 中已收录,里面包含不同方向的自学Android路线、面试题集合/面经、及系列技术文章等,资源持续更新中...
当程序员容易,当一个优秀的程序员是需要不断学习的,从初级程序员到高级程序员,从初级架构师到资深架构师,或者走向管理,从技术经理到技术总监,每个阶段都需要掌握不同的能力。早早确定自己的职业方向,才能在工作和能力提升中甩开同龄人。