RxJava源码浅析(二): 深入操作符解析

2022-03-21 10:46:27 浏览数 (1)

上篇文章我们通过源码了解了RxJava基本流程,RxJava源码浅析(一): 基础流程 这里我们研究下操作符的源码是怎么实现的。有了上篇文章的基础,这里讲起来会轻松很多。操作符很多,我们随机挑了几个操作符来看看。

还是基于上篇文章来看

代码语言:javascript复制
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        Log.e("observable","Thread ID:" Thread.currentThread().getId());
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.e("onSubscribe","Thread ID:" Thread.currentThread().getId());
    }
    @Override
    public void onNext(Integer integer) {
        Log.e("observer","Thread ID:" Thread.currentThread().getId() "  " "value:" integer);
    }
    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
};

一、map源码解析

代码语言:javascript复制
//关联上游和下游
myobservable.map(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer integer) throws Throwable {
        return integer 10;
    }
}).subscribe(myobserver);

用map操作符,我们可以得知,onNext返回的数字都是加10。我们来看下map源码吧。

代码语言:javascript复制
public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}

看到onAssembly方法,根据上篇文章我们知道,他是直接返回new ObservableMap<>(this, mapper),那我们直接扒开ObservableMap看看。

代码语言:javascript复制
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
           ..................................
            try {
                v = Objects.requireNonNull(mapper.apply(t), "returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
         .....................................
    }

ObservableMap也是Observable具体现实。这里面创建了MapObserver,包装了myobserver。

有了上一篇文章基础,我们知道基础流程里面是Observable的具体实现通过subscribe方法,把数据传给Observer的具体实现。里面两个主体是一对Observable和Observer。

现在map操作符,我们可以知道。里面创建了一对ObservableMap和MapObserver。

我们知道.subscribe(myobserver)会走到ObservableMap中的subscribeActual方法,这里的source就是myobservable,那source.subscribe(new MapObserver<T,U>(t,function))就会走到Observable中的subscribe方法,最终调用ObservableOnSubscribe.subscribe方法。

这里ObservableEmitter上篇文章讲过了,是Observer的一个包装,此时的Observer是谁?对是MapObserver,emitter.onNext()会调用ObservableMap的onNext()。看下ObservableMap的onNext()源码关键片段

代码语言:javascript复制
try {
    v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
    fail(ex);
    return;
}
downstream.onNext(v);

其中mapper.apply(t),他的具体实现是

代码语言:javascript复制
@Override
public Object apply(Integer integer) throws Throwable {
    return integer 10;
}

这里的downstream,我们看下他的源头,他的具体实现就是myobserver。那大概的意思就是onNext数据会经过new Function加工,加工后再交给myobserver的onNext。

总结:大概流程就是这样。和基础流程差不多,只是把myobserver包装成MapObserver,MapObserver会对数据进行处理以下再传给myobserver。

二、flatMap源码解析

代码语言:javascript复制
myobservable.flatMap(new Function<Integer, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(Integer integer) throws Throwable {
        return Observable.just(1,2,4);
    }
}).subscribe(myobserver);

这个稍微有点复杂了,flatMap是返回一个ObservableFlatMap,我们不怕,继续看看他们内部。

代码语言:javascript复制
public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

    public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        Objects.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        if (this instanceof ScalarSupplier) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarSupplier<T>)this).get();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }

看完这个我们应该知道,flatMap直接返回ObservableFlatMap,有了上一篇文章,我们也清楚流程了,直接看subscribeActual

代码语言:javascript复制
@Override
public void subscribeActual(Observer<? super U> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }
    source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

有了经验,看到这里source.subscribe,直接看MergeObserver onNext方法。

代码语言:javascript复制
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        upstream.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip  ;
        }
    }

    subscribeInner(p);
}

这里面核心代码p = Objects.requireNonNull(mapper.apply(t)创建了一个新ObservableSource。上级每次onNext都要生成对应的一个ObservableSource。

mapper.apply(t)返回的就是新生成的Observable

继续看下subscribeInner

代码语言:javascript复制
void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Supplier) {
            if (tryEmitScalar(((Supplier<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                boolean empty = false;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        empty = true;
                    }
                }
                if (empty) {
                    drain();
                    break;
                }
            } else {
                break;
            }
        } else {
            InnerObserver<T, U> inner = new InnerObserver<>(this, uniqueId  );
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

正常情况,ObservableSource不是Supplier类型,我们当前ObservableSource是Observable,我们直接走else 。

p.subscribe(inner);直接走InnerObserver的onNext方法。

代码语言:javascript复制
public void onNext(U t) {
    if (fusionMode == QueueDisposable.NONE) {
        parent.tryEmit(t, this);
    } else {
        parent.drain();
    }
}

fusionMode默认是QueueDisposable.NONE,那我们继续看parent.tryEmit(t,this);

代码语言:javascript复制
void tryEmit(U value, InnerObserver<T, U> inner) {
    if (get() == 0 && compareAndSet(0, 1)) {
        downstream.onNext(value);
        if (decrementAndGet() == 0) {
            return;
        }
    } else {
        SimpleQueue<U> q = inner.queue;
        if (q == null) {
            q = new SpscLinkedArrayQueue<>(bufferSize);
            inner.queue = q;
        }
        q.offer(value);
        if (getAndIncrement() != 0) {
            return;
        }
    }
    drainLoop();
}

MergeObserver 继承了 AtomicInteger,所以这里的tryEmit方法就利用了 AtomicInteger 的同步机制,所以同时只会有一个 value 被 actual Observer 发射。由于 AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的(同一个Observable发出的数据是有序的)

如果没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,然后发射。

代码语言:javascript复制
void drainLoop() {
    final Observer<? super U> child = this.downstream;
    int missed = 1;
    for (;;) {
        if (checkTerminate()) {
            return;
        }
        int innerCompleted = 0;
        SimplePlainQueue<U> svq = queue;

        if (svq != null) {
            for (;;) {
                if (checkTerminate()) {
                    return;
                }

                U o = svq.poll();

                if (o == null) {
                    break;
                }

                child.onNext(o);
                innerCompleted  ;
            }

drainLoop就是不断遍历SimplePlainQueue队列,(这个SimplePlainQueue就是存放的next数据),不断child.onNext(o);

总结下: faltMap返回一个新的被观察者ObservableB【ObservableFlatMap】,重写ObservableB的subscribeActual方法,在原始的观察者ObserverA【myobserver】对其进行订阅时,新建一个观察者ObserverB对原始的ObservableA进行订阅。新的观察者ObserverB【MergeObserver持有原始的ObserverA和faltMap接收的匿名对象实例function。当ObserverB监听到原始的被观察者ObservableA的事件时,ObserverB调用function的apply方法获得新新的被观察者ObservableC【Observable.just(1,2,4)】,再创建一个新的观察者ObserverC【InnerObserver】对ObservableC进行订阅,ObserverC持有原始的观察者ObserverA,在ObserverC观察到被观察者ObservableC的时间时,调用原始的观察者ObserverA的方法。

0 人点赞