1.去重
代码语言:javascript复制Flowable.just(1, 1, 1, 2, 2, 3, 4, 5)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "distinct : " integer "n");
}
});
2.每次用一个方法处理一个值.这里是两两相加
代码语言:javascript复制Flowable.just(1, 2, 3, 4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: reduce : " integer "n");
}
});
3.跳过 count 个数目开始接收
代码语言:javascript复制Flowable.just(1, 2, 3, 4, 5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "skip : " integer "n");
}
});
4.至多接收 count 个数据
代码语言:javascript复制Flowable.fromArray(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: take : " integer "n");
}
});
5.和少的配对
代码语言:javascript复制Flowable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "zip : accept : " s "n");
}
});
private Flowable<String> getStringObservable() {
return Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("A");
Log.e(TAG, "String emit : A n");
e.onNext("B");
Log.e(TAG, "String emit : B n");
e.onNext("C");
Log.e(TAG, "String emit : C n");
}
}, BackpressureStrategy.BUFFER);
}
private Flowable<Integer> getIntegerObservable() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
Log.e(TAG, "Integer emit : 1 n");
e.onNext(2);
Log.e(TAG, "Integer emit : 2 n");
e.onNext(3);
Log.e(TAG, "Integer emit : 3 n");
e.onNext(4);
Log.e(TAG, "Integer emit : 4 n");
e.onNext(5);
Log.e(TAG, "Integer emit : 5 n");
}
}, BackpressureStrategy.BUFFER);
}
6.merge的作用是把多个 Observable 结合起来.
它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送
代码语言:javascript复制Flowable.merge(Flowable.just(1, 2), Flowable.just(3, 4, 5))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept", "merge :" integer "n");
}
});
7.一对一
代码语言:javascript复制Flowable.just(250)
//这个第一个泛型为接收参数的数据类型,第二个泛型为转换后要发射的数据类型
.map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
return "你是" s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("consumer", s);
}
});
8.1对多
代码语言:javascript复制ArrayList<String[]> list = new ArrayList<>();
String[] words1 = {"Hello,", "I am", "China!"};
String[] words2 = {"Hello,", "I am", "Beijing!"};
list.add(words1);
list.add(words2);
Flowable.fromIterable(list)
.flatMap(new Function<String[], Publisher<String>>() {
@Override
public Publisher<String> apply(String[] strings) throws Exception {
return Flowable.fromArray(strings[0] strings[1] strings[2]);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("consumer", s);
}
});
9.构造函数
代码语言:javascript复制//创建订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//这一步是必须,我们通常可以在这里做一些初始化操作,调用request()方法表示初始化工作已经完成
//调用request()方法,会立即触发onNext()方法---不调用的话会卡住,onNext无法调用
Log.e("onSubscribe", "onSubscribe");
Log.e("onSubscribe", Thread.currentThread().getName());
//s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String value) {
Log.e("onNext", value);
Log.e("onNext", Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
Log.e("onError", t.getMessage());
}
@Override
public void onComplete() {
//由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete
Log.e("onComplete", "onComplete");
Log.e("onComplete", Thread.currentThread().getName());
}
};
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
Log.e("subscribe", Thread.currentThread().getName());
e.onNext("Hello,I am China!");
//e.onError(new Throwable("发生错误啦"));
//onError,onComplete二选一
e.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
10.amb操作符只发射首先发射数据或通知的那个Observable的所有数据
代码语言:javascript复制ArrayList<Flowable<String>> list = new ArrayList();
list.add(Flowable.just("FIRST").delay(2, TimeUnit.SECONDS));
list.add(Flowable.just("SECOND"));
Flowable.amb(list)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
showMsg(s);//SECOND
}
});
11.多个edittext来决定是否激活button—combineLatest
这个只有1.0有效
代码语言:javascript复制// 1.0
Observable<CharSequence> ObservableEmail = RxTextView.textChanges(mEmailView);
Observable<CharSequence> ObservablePassword = RxTextView.textChanges(mPasswordView);
ArrayList<Observable<CharSequence>> date = new ArrayList<>();
date.add(ObservableEmail);
date.add(ObservablePassword);
//Function第一个参数必须是Object[]
Observable.combineLatest(date, new Function<Object[], Boolean>() {
@Override
public Boolean apply(Object[] str) {
return isEmailValid(str[0].toString()) && isPasswordValid(str[1].toString());
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
if (aBoolean) {
showMsg("success");
} else {
showMsg("fail");
}
}
});
12.concat
代码语言:javascript复制Flowable.concat(Flowable.just("a"), Flowable.just("b"), Flowable.just("c")).subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("TAG", s);
}
});