RxJava2 解析

2019-12-26 14:45:57 浏览数 (2)

RxJava2 使用

代码语言:javascript复制
private void analyzeRxJava(){
    Observable.create(new ObservableOnSubscribe<String>()
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("1");
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe() called with: d = ["   d   "]");
        }
 
        @Override
        public void onNext(String value) {
            Log.d(TAG, "onNext() called with: value = ["   value   "]");
        }
 
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError() called with: e = ["   e   "]");
        }
 
        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete() called");
        }
    });
}

源码解析

Observable.create()被观察者创建

代码语言:javascript复制
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//被观察者的订阅进行非空判断
    ObjectHelper.requireNonNull(source, "source is null");
//不为空返回
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
    try {
//将被观察者添加到function中进行操变换作
        return f.apply(t);
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}
//Function变换对象,接收一个输入值转变成另一个值进行输出
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

Observable 如何将数据发送出去

通过ObservableEmitter 被观察者的发射器,做发送数据、错误、完成等操作,是一个接口,继承自Emitter

代码语言:javascript复制
public interface ObservableEmitter<T> extends Emitter<T> {
 
    //调用disposed可以导致下游接收不到事件  
    void setDisposable(@Nullable Disposable d);
 
    void setCancellable(@Nullable Cancellable c);
 
    //返回为true,下游接收者已经释放
    boolean isDisposed();
 
    @NonNull
    ObservableEmitter<T> serialize();
 
    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
 
    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);
 
    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);
 
    /**
     * Signal a completion.
     */
    void onComplete();
}

ObservableOnsubscribe源码分析

ObservableOnsubscribe类是一个接口,在ObservableCreate中初始化。

代码语言:javascript复制
Observable 类
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
 
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//与观察者进行绑定,调用ObservableCreate的subscribeActual方法,将源头和终点关联起来    
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
 
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
 
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
//发射器发送事件
        e.onNext("1");
//发送完成,接收者无法接收到onComplete事件以后的消息
        e.onComplete();
    }
})
public interface ObservableOnSubscribe<T> {
 
    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

ObservableCreate的创建被观察者源码分析

ObservableCreate用来创建被观察者,并和发射器建立关联,与observer建立关联,处理消息

代码语言:javascript复制
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
 
    public ObservableCreate(ObservableOnSubscribe<T> source) {
//初始化source
        this.source = source;
    }
 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
//创建发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//观察者订阅发射器
        observer.onSubscribe(parent);
 
        try {
//被观察者也订阅了发射器对象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
//定义一个CreateEmitter发射器类
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
 
        private static final long serialVersionUID = -3434801548987643227L;
 
        final Observer<? super T> observer;
 
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
 
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
//判断是否已经释放
            if (!isDisposed()) {
//没有释放就接收事件,此时获取的内容就是被观察者发送出去的消息
                observer.onNext(t);
            }
        }
 
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
 
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
//接收错误信息
                    observer.onError(t);
                } finally {
//释放Disposable,以后观察者发送的消息就接收不到
                    dispose();
                }
                return true;
            }
            return false;
        }
 
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
//接收被观察者发出的onComplete,释放Disposable,以后观察者发送的消息就接收不到
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
 
        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }
 
        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }
 
        @Override
        public ObservableEmitter<T> serialize() {
//序列化就初始化序列化发射器
            return new SerializedEmitter<T>(this);
        }
 
        @Override
        public void dispose() {
//释放Disposable对象
            DisposableHelper.dispose(this);
        }
 
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
 
    /**
     * Serializes calls to onNext, onError and onComplete.
     *
     * @param <T> the value type
     */
//一种序列化的发射器,内部通过最大缓存为16的队列实现
    static final class SerializedEmitter<T>
    extends AtomicInteger
    implements ObservableEmitter<T> {
 
        private static final long serialVersionUID = 4883307006032401862L;
 
        final ObservableEmitter<T> emitter;
 
        final AtomicThrowable error;
 
        final SpscLinkedArrayQueue<T> queue;
 
        volatile boolean done;
 
        SerializedEmitter(ObservableEmitter<T> emitter) {
            this.emitter = emitter;
            this.error = new AtomicThrowable();
//一种简单的生产者与消费者队列
            this.queue = new SpscLinkedArrayQueue<T>(16);
        }
 
        @Override
        public void onNext(T t) {
            if (emitter.isDisposed() || done) {
                return;
            }
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (get() == 0 && compareAndSet(0, 1)) {
                emitter.onNext(t);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<T> q = queue;
                synchronized (q) {
                    q.offer(t);
                }
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }
 
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
 
        @Override
        public boolean tryOnError(Throwable t) {
            if (emitter.isDisposed() || done) {
                return false;
            }
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (error.addThrowable(t)) {
                done = true;
                drain();
                return true;
            }
            return false;
        }
 
        @Override
        public void onComplete() {
            if (emitter.isDisposed() || done) {
                return;
            }
            done = true;
            drain();
        }
 
        void drain() {
//AutomicInteger 提供原子操作的线程安全的自增自减的Integer类
public final int get() //获取当前的值
 
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
 
public final int getAndIncrement()//获取当前的值,并自增
 
public final int getAndDecrement() //获取当前的值,并自减
 
public final int getAndAdd(int delta)  //获取当前的值,并加上预期的值
 
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }
 
        void drainLoop() {
            ObservableEmitter<T> e = emitter;
            SpscLinkedArrayQueue<T> q = queue;
            AtomicThrowable error = this.error;
            int missed = 1;
死循环操作,退出标志为
1. 发射器释放了,就清空队列
2. 出现错误一样操作,然后退出
3. 操作已经完成,或者队列为空就返回退出
否则一直循环发射消息
            for (;;) {
 
                for (;;) {
                    if (e.isDisposed()) {
                        q.clear();
                        return;
                    }
 
                    if (error.get() != null) {
                        q.clear();
                        e.onError(error.terminate());
                        return;
                    }
 
                    boolean d = done;
//轮询取出队列中的数据
                    T v = q.poll();
 
                    boolean empty = v == null;
 
                    if (d && empty) {
                        e.onComplete();
                        return;
                    }
 
                    if (empty) {
                        break;
                    }
 
                    e.onNext(v);
                }
 
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
 
        @Override
        public void setDisposable(Disposable s) {
            emitter.setDisposable(s);
        }
 
        @Override
        public void setCancellable(Cancellable c) {
            emitter.setCancellable(c);
        }
 
        @Override
        public boolean isDisposed() {
            return emitter.isDisposed();
        }
 
        @Override
        public ObservableEmitter<T> serialize() {
            return this;
        }
    }
 
}
//获取当前的Disposable对象释放
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}

线程调度处理

Schedulers.io
代码语言:javascript复制
Schedulers类中
static final Scheduler IO;
static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
 
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
//初始化子线程
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
 
    TRAMPOLINE = TrampolineScheduler.instance();
 
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
 
@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
IoScheduler类中初始化线程
public IoScheduler() {
    this(WORKER_THREAD_FACTORY);
}
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
 
RxThreadFactory类中创建线程
public RxThreadFactory(String prefix, int priority) {
        this(prefix, priority, false);
    }
 
    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
        this.prefix = prefix;
        this.priority = priority;
        this.nonBlocking = nonBlocking;
    }
 
    @Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        String name = nameBuilder.toString();
//判断是否创建非阻塞式线程,scheduler中nonBlocking为false
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
//设置线程优先级
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
static final class RxCustomThread extends Thread implements NonBlockingThread {
    RxCustomThread(Runnable run, String name) {
        super(run, name);
    }
}
/**运行在Scheduler不推荐实现此接口
 * Marker interface to indicate blocking is not recommended while running
 * on a Scheduler with a thread type implementing it.
 */
public interface NonBlockingThread {
 
}

AndroidSchedulers.mainThread()事件消费线程在主线程

代码语言:javascript复制
public final class AndroidSchedulers {
 
    private static final class MainHolder {
//在主线程创建handler,并使用了静态内部类单例模式创建HandlerScheduler
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
 
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
 
    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
 
    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }
 
    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}
final class HandlerScheduler extends Scheduler {
    private final Handler handler;
 
    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
 
    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
 
        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }
 
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
 
    private static final class HandlerWorker extends Worker {
        private final Handler handler;
 
        private volatile boolean disposed;
 
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
 
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
 
            if (disposed) {
                return Disposables.disposed();
            }
 
            run = RxJavaPlugins.onSchedule(run);
 
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
 
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
 
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
 
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
 
            return scheduled;
        }
 
        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }
 
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
 
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;
 
        private volatile boolean disposed;
 
        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }
 
        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }
 
        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }
 
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}
observeOn()方法

observeOn()方法 事件消费的线程即下游线程

代码语言:javascript复制
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//变换ObservableObserveOn对象进行输出
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
//创建该对象
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
//判断当前调度器是否是Trampoline调度器,是的就调用父类ObservableSource进行订阅观察者
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
//不是创建工作调度器,如果在主线程调用就创建HandlerWorker
            Scheduler.Worker w = scheduler.createWorker();
//调用父类ObservableSource订阅ObserveOnObserver,订阅上游数据源
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
//创建ObserveOnObserver,当上游数据push下来时,会有ObserveOnObserver对应的onxx方法处理
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
 
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
//上游观察者push的数据都是存储在这里
        SimpleQueue<T> queue;
 
        Disposable s;
 
        Throwable error;
        volatile boolean done;
 
        volatile boolean cancelled;
//代表同步或者异步发送
        int sourceMode;
 
        boolean outputFused;
 
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
//该缓存获取的是Flowable中的,最大缓存为128,也就是最大任务数为128
//BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
            this.bufferSize = bufferSize;
        }
 
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;
 
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//判断当前是同步还是异步操作,如果都不是就使用单一的SpscLinkedArrayAueue队列,否则使用QueueDisposable
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
//观察者与Disposeable关联,Disposeable用来解除订阅者的
                        actual.onSubscribe(this);
                        return;
                    }
                }
//单一的生产者与消费者队列,用于保存上游onNetpush的数据
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
 
                actual.onSubscribe(this);
            }
        }
 
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
 
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
//开始调度
            schedule();
        }
 
        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }
 
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
 
        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
 
        @Override
        public boolean isDisposed() {
            return cancelled;
        }
 
        void schedule() {
            if (getAndIncrement() == 0) {
//工作调度器进行调度,如果在主线程操作,就发送消息,并且切换到主线程执行事件消费
                worker.schedule(this);
            }
        }
 
        void drainNormal() {
            int missed = 1;
 
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
//死循环,队列为空或者观察者接收完消息
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
 
                for (;;) {
                    boolean d = done;
                    T v;
 
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
 
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
 
                    if (empty) {
                        break;
                    }
 
                    a.onNext(v);
                }
 
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
 
        void drainFused() {
            int missed = 1;
 
            for (;;) {
                if (cancelled) {
                    return;
                }
 
                boolean d = done;
                Throwable ex = error;
 
                if (!delayError && d && ex != null) {
                    actual.onError(error);
                    worker.dispose();
                    return;
                }
 
                actual.onNext(null);
 
                if (d) {
                    ex = error;
                    if (ex != null) {
                        actual.onError(ex);
                    } else {
                        actual.onComplete();
                    }
                    worker.dispose();
                    return;
                }
 
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
 
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
 
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
 
        @Override
        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }
 
        @Nullable
        @Override
        public T poll() throws Exception {
            return queue.poll();
        }
 
        @Override
        public void clear() {
            queue.clear();
        }
 
        @Override
        public boolean isEmpty() {
            return queue.isEmpty();
        }
    }
}
observeOn总结
  1. ObserveOnObserver 实现了Observerrunnable接口
  2. onNext中先不切换线程,在数据加入到队列中,然后切换线程,在另一个线程中,从queue中取出消息,然后push给下游
  3. 所以observeOn是影响下游线程执行,多次调用依然生效
  4. 关于多次生效,对比subscribeOn换线程是在subscribeActual中触发的,主动切换了上游线程,从而影响其发射数据所在线程, 而直到真正发射数据之前,所以subscribeOn切换一次,但observeOn是主动行为,并且切换线程会立刻发送数据,会生效多次。
subscribeOn()方法

subscribeOn()指定事件产生线程,即上游线程

代码语言:javascript复制
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存线程调度器
    final Scheduler scheduler;
 
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//super简单保存ObservableSource
        super(source);
        this.scheduler = scheduler;
    }
 
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//手动调用下游(终点)Observer.onSubscribe方法,所以onSubscribe执行在订阅线程
        s.onSubscribe(parent);
//scheduler.scheduleDirect开启一个没有延迟的调度任务执行
//将子线程调度加入到Disposable管理类
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
 
        private static final long serialVersionUID = 8094547886072529208L;
//真正的下游终点观察者
        final Observer<? super T> actual;
//读和写都是原子性的对象引用变量
//用于保存上游的disposable,以便在下游dispose时一起dispose
        final AtomicReference<Disposable> s;
 
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
//该方法由上游调用,传入Disposable,在本类中将s赋值给this.s,加入管理
        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }
//直接调用下游观察者的方法
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
 
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
 
        @Override
        public void onComplete() {
            actual.onComplete();
        }
 
//取消订阅时,连同上游Disposable一起取消
        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }
 
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
//这个方法在subscribeActual手动调用,将scheudlers中的worker加入管理
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
 
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
 
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
 
        @Override
        public void run() {
//此时运行在相应的Scheduler线程
            source.subscribe(parent);
        }
    }
}
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
    ObjectHelper.requireNonNull(d, "d is null");
//CAS,在Java并发应用中通常指CompareAndSwap或CompareAndSet,即比较并交换。
CAS是一个原子操作,它比较一个内存位置的值并且只有相等时修改这个内存位置的值为新的值,
保证了新的值总是基于最新的信息计算的,如果有其他线程在这期间修改了这个值则CAS失败。
CAS返回是否成功或者内存位置原来的值用于判断是否CAS成功。
这里比较当前AutomicReference中存储的值,如果为null就用d值替换空,整个操作原子性
    if (!field.compareAndSet(null, d)) {
        d.dispose();
//通过AtomicReference.get方法获取存储在AtomicReference中的引用,如果非范型就返回Object,否则返回指定类型
//引用
        if (field.get() != DISPOSED) {
            reportDisposableSet();
        }
        return false;
    }
    return true;
}
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建worker对象    
final Worker w = createWorker();
 
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
 
    DisposeTask task = new DisposeTask(decoratedRun, w);
 
    w.schedule(task, delay, unit);
 
    return task;
}
subscribeOn总结
  1. 返回一个ObservableSubscribeOn包装对象
  2. 上一步方法被订阅后,调用subscribeAutal方法,立刻将线程切换到Schedulers.xx线程中
  3. 在切换线程中,执行source.subscribe(parent)方法,对上游observable订阅
  4. 上游开始发送数据,上游发送数据仅仅调用下游的onNext等方法,在切换线程中执行

线程调度总结

  1. subscribeOn 指定subscribe()所发生的线程,即事件产生的线程 ,影响它前面执行所在的线程,这里是Observable.create中的subscribe方法线程。
  2. observeOn 指定subscriber所发生的线程,即事件消费线程,影响它后年执行所在的线程,这里是subscribe(new Observer)方法中线程。

源码分析总结

  1. subscribeActual()方法中,源头和终点关联起来。
  2. source.subscribe(parent);这句代码执行时,才开始从发送ObservableOnSubscribe中利用ObservableEmitter发送数据给Observer。即数据是从源头push给终点的。
  3. CreateEmitter 中,只有ObservableObserver的关系没有被dispose,才会回调ObserveronXXXX()方法
  4. ObserveronComplete()onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据上一点,验证此结论。
  5. errorcompletecomplete不显示。 反之会crash
  6. 要注意的是onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。

0 人点赞