RxJava之转换操作符源码介绍

2022-12-19 13:25:01 浏览数 (1)

转载请以链接形式标明出处: 本文出自:103style的博客

转换相关的操作符 以及 官方介绍

RxJava转换操作符 官方介绍 :Transforming Observables

  • buffer
  • cast
  • concatMap
  • concatMapXXX
  • flatMap
  • flatMapXXX
  • flattenAsFlowable
  • flattenAsObservable
  • groupBy
  • map
  • scan
  • switchMap
  • window

以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析。

buffer

官方示例:

代码语言:javascript复制
Observable.range(0, 10)
    .buffer(4)
    .subscribe((List<Integer> buffer) -> System.out.println(buffer));

输出:

代码语言:javascript复制
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]

返回对象的 ObservableBuffersubscribeActual 方法: 单参数bufferskipcount 是相等的。

代码语言:javascript复制
protected void subscribeActual(Observer<? super U> t) {
    if (skip == count) {
        BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
        if (bes.createBuffer()) {//1.0
            source.subscribe(bes);
        }
    } else {
        source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
    }
}
  • (1.0) createBuffer即新创建了一个ArrayList对象 buffer

onNext(T t)onComplete()方法:

代码语言:javascript复制
public void onNext(T t) {
    U b = buffer;
    if (b != null) {
        b.add(t);
        if (  size >= count) {//1.0
            downstream.onNext(b);
            size = 0;
            createBuffer();
        }
    }
}

public void onComplete() {
    U b = buffer;
    if (b != null) {//2.0
        buffer = null;
        if (!b.isEmpty()) {
            downstream.onNext(b);
        }
        downstream.onComplete();
    }
}
  • (1.0) 每次调用onNext 就检查缓存的事件数是否 不小于 buffer操作符设置的 值,成立则将缓存的 buffer 数组 传给观察者的 onNext
  • (2.0) onComplete 是检查缓存的事件数是否不为空,成立则将缓存的 buffer 数组 传给观察者的 onNext,再调用观察者的 onComplete

cast

官方示例:

代码语言:javascript复制
Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
numbers.filter((Number x) -> x instanceof Integer)
        .cast(Integer.class)
        .subscribe((Integer x) -> System.out.println(x));

输出:

代码语言:javascript复制
1
7
12
5

cast 是通过map操作符来实现的,我们直接看map

代码语言:javascript复制
public final <U> Observable<U> cast(final Class<U> clazz) {
    ObjectHelper.requireNonNull(clazz, "clazz is null");
    return map(Functions.castFunction(clazz));
}

apply方法:

代码语言:javascript复制
public U apply(T t) throws Exception {
    return clazz.cast(t);
} 

map

官方示例:

代码语言:javascript复制
Observable.just(1, 2, 3)
        .map(x -> x * x)
        .subscribe(System.out::println);

输出:

代码语言:javascript复制
1
4
9

返回对象的 ObservableMapsubscribeActual 方法:

代码语言:javascript复制
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

继续看 MapObserveronNext(T t)

代码语言:javascript复制
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}
  • 即将 map 操作符 传入Function对象的 返回值 传递给 链式调用上一步的返回对象的 onNext(T t)

concatMap

RxJava之concatMap系列转换操作符源码介绍


flatMap

RxJava之flatMap系列转换操作符源码介绍


flattenAsFlowable

官方示例:

代码语言:javascript复制
Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
    return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
});
flowable.subscribe(x -> System.out.println("onNext: "   x));

输出:

代码语言:javascript复制
onNext: 2.0
onNext: 4.0
onNext: 8.0

我们先看Single.just(2.0)

代码语言:javascript复制
public static <T> Single<T> just(final T item) {
    ObjectHelper.requireNonNull(item, "value is null");
    return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}

SingleJustsubscribeActual

代码语言:javascript复制
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposables.disposed());
    observer.onSuccess(value);
}

flattenAsFlowable返回对象的 SingleFlatMapIterableFlowablesubscribeActual 方法:

代码语言:javascript复制
protected void subscribeActual(Subscriber<? super R> s) {
    source.subscribe(new FlatMapIterableObserver<T, R>(s, mapper));
}

继续看 FlatMapIterableObserveronSubscribe(Disposable d)onSuccess(T value)

代码语言:javascript复制
public void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        downstream.onSubscribe(this);//1.0
    }
}
public void onSuccess(T value) {
    Iterator<? extends R> iterator;
    boolean has;
    try {
        iterator = mapper.apply(value).iterator();//2.0 调用apply返回的Iterable对象的 iterator()方法。
        has = iterator.hasNext();//3.0
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        downstream.onError(ex);
        return;
    }
    if (!has) {
        downstream.onComplete();//3.1
        return;
    }
    this.it = iterator;//3.2
    drain();
}

(1.0) 通过Flowable subscribe流程介绍 我们知道downstream.onSubscribe(this)即调用 FlowableInternalHelper.RequestMax.INSTANCEaccept方法:

代码语言:javascript复制
public enum RequestMax implements Consumer<Subscription> {
    INSTANCE;
    @Override
    public void accept(Subscription t) throws Exception {
        t.request(Long.MAX_VALUE);
    }
}

即:FlatMapIterableObserver.request(Long.MAX_VALUE): 即为设置变量requestedvalueLong.MAX_VALUEdrain()因为it 变量还是null,所以没做什么操作。

代码语言:javascript复制
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        drain();
    }
}

(2.0) 调用flattenAsFlowable传入的Functionapply返回的Iterable对象的 iterator()方法。

(3.0) 检查Iterable时候为空,(3.1) 为空直接onComplete()(3.2) 不为空则将 iterator()返回值赋值给当前的 it 变量,继续执行drain()

drain():

代码语言:javascript复制
void drain() {
    ...
    Subscriber<? super R> a = downstream;
    Iterator<? extends R> iterator = this.it;
    ...
    int missed = 1;
    for (; ; ) {
        if (iterator != null) {
            long r = requested.get();
            long e = 0L;
            if (r == Long.MAX_VALUE) {//1.0
                slowPath(a, iterator);
                return;
            }
            ...
        }
        ...
    }
}
  • 因为上一步downstream.onSubscribe(this)调用了request(Long.MAX_VALUE), 所以 (1.0) 这里条件成立,执行slowPath(downstream iterator)

slowPath(downstream iterator)

代码语言:javascript复制
void slowPath(Subscriber<? super R> a, Iterator<? extends R> iterator) {
    for (;;) {
        if (cancelled) {
            return;
        }
        R v;
        try {
            v = iterator.next();//1.0
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            a.onError(ex);
            return;
        }
        a.onNext(v);//1.1
        if (cancelled) {
            return;
        }
        boolean b;
        try {
            b = iterator.hasNext();//1.2
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            a.onError(ex);
            return;
        }
        if (!b) {
            a.onComplete();//1.3
            return;
        }
    }
}
  • (1.0) 获取到的元素,(1.1)传递给downstreamonNext(1.2)然后判断是否还有其他元素,如果有则循环继续,没有的话即调用 downstreamonComplete 结束。

flattenAsObservable

官方示例:

代码语言:javascript复制
Single<Double> source = Single.just(2.0);
Observable<Double> observable = source.flattenAsObservable(x -> {
    return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
});
observable.subscribe(x -> System.out.println("onNext: "   x));

输出:

代码语言:javascript复制
onNext: 2.0
onNext: 4.0
onNext: 8.0

subscribeActual实现的逻辑和 flattenAsFlowable 类似,只是返回的对象为 SingleFlatMapIterableObservable,就不再赘述了。


groupBy

官方示例:

代码语言:javascript复制
Observable<String> animals = Observable.just(
    "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
    .concatMapSingle(Observable::toList)
    .subscribe(System.out::println);

输出:

代码语言:javascript复制
[TIGER, TURTLE]
[ELEPHANT]
[CAT, CHAMELEON]
[FROG, FISH, FLAMINGO]

我们来看看返回对象的ObservableGroupBy:

代码语言:javascript复制
public GroupByObserver(Observer<? super GroupedObservable<K, V>> actual, 
                       Function<? super T, ? extends K> keySelector, 
                       Function<? super T, ? extends V> valueSelector, 
                       int bufferSize, boolean delayError) {
    this.downstream = actual;
    this.keySelector = keySelector;
    this.valueSelector = valueSelector;
    this.bufferSize = bufferSize;
    this.delayError = delayError;
    this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
    this.lazySet(1);
}

public void subscribeActual(Observer<? super GroupedObservable<K, V>> t) {
    source.subscribe(new GroupByObserver<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError));
} 

继续看 GroupByObserveronNext(T t)

代码语言:javascript复制
public void onNext(T t) {
    K key;
    try {
        key = keySelector.apply(t);//1.0
    } catch (Throwable e) {
        ...
        return;
    }
    Object mapKey = key != null ? key : NULL_KEY;
    GroupedUnicast<K, V> group = groups.get(mapKey);
    if (group == null) {
        if (cancelled.get()) {
            return;
        }
        group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
        groups.put(mapKey, group);//2.0
        getAndIncrement();
        downstream.onNext(group);//3.0
    }

    V v;
    try {
        v = ObjectHelper.requireNonNull(valueSelector.apply(t), "The value supplied is null");//4.0
    } catch (Throwable e) {
        ...
        return;
    }
    group.onNext(v);//4.1
}
  • (1.0) 通过keySelector.apply(t)即官方示例中的 animal.charAt(0)获取分组的 key
  • (2.0) 如果GroupedUnicast不存再这个key,则保存进去。
  • (3.0) 然后继续调用上一步操作符的 onNext方法,即官方示例中的just
  • (4.0) 通过valueSelector.apply(t)即官方示例中的 String::toUpperCase)获取值,(4.1)添加到ToListObservercollection中。

最后通过 onComplete()输出:

代码语言:javascript复制
public void onComplete() {
    List<ObservableGroupBy.GroupedUnicast<K, V>> list = new ArrayList<ObservableGroupBy.GroupedUnicast<K, V>>(groups.values());
    groups.clear();
    for (ObservableGroupBy.GroupedUnicast<K, V> e : list) {
        e.onComplete();
    }
    downstream.onComplete();
}

ToListObserveronComplete():

代码语言:javascript复制
public void onComplete() {
    U c = collection;
    collection = null;
    downstream.onNext(c);
    downstream.onComplete();
}

scan

官方示例:

代码语言:javascript复制
Observable.just(5, 3, 8, 1, 7)
        .scan(0, (partialSum, x) -> partialSum   x)
        .subscribe(System.out::println);

输出:

代码语言:javascript复制
0
5
8
16
17
24

我们来看看返回对象的ObservableScanSeed:

代码语言:javascript复制
public ObservableScanSeed(ObservableSource<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> accumulator) {
    super(source);
    this.accumulator = accumulator;
    this.seedSupplier = seedSupplier;
}

@Override
public void subscribeActual(Observer<? super R> t) {
    R r;
    try {
        r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null");//1.0
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        EmptyDisposable.error(e, t);
        return;
    }
    source.subscribe(new ScanSeedObserver<T, R>(t, accumulator, r));
}
  • (1.0) seedSupplier.call()即官方示例中的 0,即设置 r 的值为0.

继续看ScanSeedObserveronNext(T t):

代码语言:javascript复制
public void onNext(T t) {
    if (done) {
        return;
    }
    R v = value;//1.0
    R u;
    try {
        u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");//2.0
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        upstream.dispose();
        onError(e);
        return;
    }
    value = u;//3.0
    downstream.onNext(u);//4.0
}
  • (1.0) value 即为上一步设置的 r0.
  • (2.0) accumulator.apply(v, t) 即为官方示例中的 partialSum x
  • (3.0) 更新value 的值
  • (4.0)accumulator.apply(v, t)传递给观察者的onNext

switchMap

官方示例:

代码语言:javascript复制
Observable.interval(0, 1, TimeUnit.SECONDS)
        .switchMap(x -> {
            return Observable.interval(0, 750, TimeUnit.MILLISECONDS)
                    .map(y -> x);
        })
        .takeWhile(x -> x < 3)
        .blockingSubscribe(System.out::print);

输出:

代码语言:javascript复制
001122

我们来看看返回对象的ObservableSwitchMap:

代码语言:javascript复制
public ObservableSwitchMap(ObservableSource<T> source,
                           Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize,
                                   boolean delayErrors) {
    super(source);
    this.mapper = mapper;
    this.bufferSize = bufferSize;
    this.delayErrors = delayErrors;
}

@Override
public void subscribeActual(Observer<? super R> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }
    source.subscribe(new SwitchMapObserver<T, R>(t, mapper, bufferSize, delayErrors));
}

继续看SwitchMapObserveronNext:

代码语言:javascript复制
public void onNext(T t) {
    long c = unique   1;
    unique = c;
    SwitchMapInnerObserver<T, R> inner = active.get();
    if (inner != null) {
        inner.cancel();
    }
    ObservableSource<? extends R> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The ObservableSource returned is null");//1.0
    } catch (Throwable e) {
        ...
        return;
    }
    SwitchMapInnerObserver<T, R> nextInner = new SwitchMapInnerObserver<T, R>(this, c, bufferSize);//2.0
    for (;;) {
        inner = active.get();
        if (inner == CANCELLED) {
            break;
        }
        if (active.compareAndSet(inner, nextInner)) {
            p.subscribe(nextInner);//3.0
            break;
        }
    }
}
  • (1.0) 通过mapper.apply(t)即官方示例中的 Observable.interval(0, 750, TimeUnit.MILLISECONDS).map(y -> x)返回的ObservableMap对象。
  • (2.0) 构建SwitchMapInnerObserver对象
  • (3.0) 用返回的ObservableMap订阅SwitchMapInnerObserver对象

window

官方示例:

代码语言:javascript复制
Observable.range(1, 10)
        // Create windows containing at most 2 items, and skip 3 items before starting a new window.
        .window(2, 3)
        .flatMapSingle(window -> {
            return window.map(String::valueOf)
                    .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
        })
        .subscribe(System.out::println);

输出:

代码语言:javascript复制
[1, 2]
[4, 5]
[7, 8]
[10]

我们来看看返回对象的ObservableWindow:

代码语言:javascript复制
public ObservableWindow(ObservableSource<T> source, long count, long skip, int capacityHint) {
    super(source);
    this.count = count;
    this.skip = skip;
    this.capacityHint = capacityHint;
}

@Override
public void subscribeActual(Observer<? super Observable<T>> t) {
    if (count == skip) {
        source.subscribe(new WindowExactObserver<T>(t, count, capacityHint));
    } else {
        source.subscribe(new WindowSkipObserver<T>(t, count, skip, capacityHint));
    }
}

继续看WindowSkipObserveronNext:

代码语言:javascript复制
public void onNext(T t) {
    final ArrayDeque<UnicastSubject<T>> ws = windows;
    long i = index;
    long s = skip;
    if (i % s == 0 && !cancelled) {//3.0
        wip.getAndIncrement();
        UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
        ws.offer(w);
        downstream.onNext(w);
    }
    long c = firstEmission   1;
    for (UnicastSubject<T> w : ws) {
        w.onNext(t);//1.0
    }
    if (c >= count) {
        ws.poll().onComplete();//2.0
        if (ws.isEmpty() && cancelled) {
            this.upstream.dispose();
            return;
        }
        firstEmission = c - s;
    } else {
        firstEmission = c;
    }
    index = i   1;
}
  • (1.0) 将元素存入 queue
  • (2.0) 当元素个数到达count时,就之前的元素全部输出
  • (3.0) 当元素个数到达skip时,就重新创建一个UnicastSubject来存储元素

以上

0 人点赞