这次来讲讲Rxjava中的Flowable 先看一段代码
代码语言:javascript复制 Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
System.out.println(integer);
}
});
被观察者是事件的生产者,观察者是事件的消费者。上述例子中可以看出生产者无限生成事件,而消费者每2秒才能消费一个事件,这会造成事件无限堆积,最后造成OOM。 因此问题来了,要怎么处理这些慢慢堆积起来的消息呢? Flowable就是由此产生,专门用来处理这类问题。
代码语言:javascript复制Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
看一下具体实现
代码语言:javascript复制 public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
返回一个FlowableCreate对象。之后会调用subscribeActual
代码语言:javascript复制 public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);//**********************打印emit 1,emit 2,emit 3
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
再看一下ErrorAsyncEmitter
代码语言:javascript复制 static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
......
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);//////************************
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
}
abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription {
@Override
public void onComplete() {
if (isCancelled()) {
return;
}
try {
actual.onComplete();
} finally {
serial.dispose();
}
}
@Override
public void onError(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
RxJavaPlugins.onError(e);
return;
}
try {
actual.onError(e);
} finally {
serial.dispose();
}
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);//*****************cas,原子操作,设置消费能力为n。如果超过,就报MissingBackpressureException
onRequested();
}
}
void onRequested() {
// default is no-op
}
}
当然,除了BackpressureStrategy除了ERROR以外还有别的,你可以一一去看,在此我不展示了