转载请以链接形式标明出处: 本文出自:103style的博客
过滤相关的操作符 以及 官方介绍
RxJava
之 过滤操作符 官方介绍 :Filtering Observables
debounce
distinct
distinctUntilChanged
elementAt
elementAtOrError
filter
first
firstElement
firstOrError
ignoreElement
ignoreElements
last
lastElement
lastOrError
ofType
sample
skip
skipLast
take
takeLast
throttleFirst
throttleLast
throttleLatest
throttleWithTimeout
timeout
debounce
丢弃超过debounce
设置的时间的事件
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(1500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制onNext: A
onNext: D
onNext: E
onComplete
distinct
过滤相同的事件
官方示例:
代码语言:javascript复制Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制2
3
4
1
distinctUntilChanged
过滤连续的相同事件流
官方示例:
代码语言:javascript复制Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制1
2
1
2
3
4
elementAt
获取事件流中从零开始的第指定下标的元素
官方示例:
代码语言:javascript复制Observable.range(0, 10)
.elementAt(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制5
elementAtOrError
索引不存在则走onError
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " throwable);
}
});
输出:
代码语言:javascript复制onError: java.util.NoSuchElementException
filter
自定义过滤规则
官方示例:
代码语言:javascript复制Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制2
4
6
first
获取事件流中第一个事件,返回值为 Single
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
代码语言:javascript复制A
firstElement
获取事件流中第一个事件,返回值为 Maybe
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> firstOrDefault = source.firstElement();
firstOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
代码语言:javascript复制A
firstOrError
输出第一个事件并捕获异常。
官方示例:
代码语言:javascript复制Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " throwable);
}
});
输出:
代码语言:javascript复制onError: java.util.NoSuchElementException
ignoreElement
过滤一个事件
官方示例:
代码语言:javascript复制Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("Done!");
}
}).blockingAwait();
输出:
代码语言:javascript复制Done!
ignoreElements
过滤所有事件
官方示例:
代码语言:javascript复制Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("Done!");
}
}).blockingAwait();
输出:
代码语言:javascript复制Done!
last
获取事件流中最后一个事件,返回值为 Single
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
代码语言:javascript复制C
lastElement
获取事件流中最后一个事件, 返回值为 Maybe
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> lastOrDefault = source.lastElement();
lastOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
代码语言:javascript复制C
lastOrError
同 firstOrError
官方示例:
代码语言:javascript复制Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " throwable);
}
});
输出:
代码语言:javascript复制onError: java.util.NoSuchElementException
ofType
根据类型过滤
官方示例:
代码语言:javascript复制Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制1
3
7
sample
仅在周期性时间间隔内发出最近发出的事件来过滤事件流中的事件。
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制// 700(500 200)
//1500(500 200 800)
//2100(500 200 800 600)
onNext: C
onNext: D
onComplete
skip
跳过事件流中开头的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制5
6
7
8
9
10
skipLast
跳过事件流中结尾的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制1
2
3
4
5
6
take
取事件流中开头的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.take(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制1
2
3
4
takeLast
取事件流中结尾的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.takeLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
代码语言:javascript复制7
8
9
10
throttleFirst
和 sample
相反 去指定连续时间内的第一个事件
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制onNext: A
onNext: D
onComplete
throttleLast
和 sample
一样 去指定连续时间内的最后一个事件
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制onNext: C
onNext: D
onComplete
throttleLatest
发出事件流中的事件,然后在它们之间经过指定的超时时定期发出最新项目(如果有)。
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制onNext: A
onNext: C
onNext: D
onComplete
throttleWithTimeout
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(1500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleWithTimeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制onNext: A
onNext: D
onNext: E
onComplete
timeout
在超时时间内发出每一个事件,如果超过超时事件则报错
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.timeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
代码语言:javascript复制onNext: A
onNext: B
onNext: C
java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
以上