RxJava2 使用
代码语言:javascript复制private void analyzeRxJava(){
Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" d "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" value "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" e "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});
}
源码解析
Observable.create()被观察者创建
代码语言:javascript复制@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//被观察者的订阅进行非空判断
ObjectHelper.requireNonNull(source, "source is null");
//不为空返回
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
//将被观察者添加到function中进行操变换作
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
//Function变换对象,接收一个输入值转变成另一个值进行输出
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
Observable 如何将数据发送出去
通过ObservableEmitter
被观察者的发射器,做发送数据、错误、完成等操作,是一个接口,继承自Emitter
。
public interface ObservableEmitter<T> extends Emitter<T> {
//调用disposed可以导致下游接收不到事件
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
//返回为true,下游接收者已经释放
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
ObservableOnsubscribe源码分析
ObservableOnsubscribe类是一个接口,在ObservableCreate中初始化。
代码语言:javascript复制Observable 类
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//与观察者进行绑定,调用ObservableCreate的subscribeActual方法,将源头和终点关联起来
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//发射器发送事件
e.onNext("1");
//发送完成,接收者无法接收到onComplete事件以后的消息
e.onComplete();
}
})
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
ObservableCreate的创建被观察者源码分析
ObservableCreate用来创建被观察者,并和发射器建立关联,与observer建立关联,处理消息
代码语言:javascript复制public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//初始化source
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//观察者订阅发射器
observer.onSubscribe(parent);
try {
//被观察者也订阅了发射器对象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//定义一个CreateEmitter发射器类
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//判断是否已经释放
if (!isDisposed()) {
//没有释放就接收事件,此时获取的内容就是被观察者发送出去的消息
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
//接收错误信息
observer.onError(t);
} finally {
//释放Disposable,以后观察者发送的消息就接收不到
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//接收被观察者发出的onComplete,释放Disposable,以后观察者发送的消息就接收不到
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
//序列化就初始化序列化发射器
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
//释放Disposable对象
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
/**
* Serializes calls to onNext, onError and onComplete.
*
* @param <T> the value type
*/
//一种序列化的发射器,内部通过最大缓存为16的队列实现
static final class SerializedEmitter<T>
extends AtomicInteger
implements ObservableEmitter<T> {
private static final long serialVersionUID = 4883307006032401862L;
final ObservableEmitter<T> emitter;
final AtomicThrowable error;
final SpscLinkedArrayQueue<T> queue;
volatile boolean done;
SerializedEmitter(ObservableEmitter<T> emitter) {
this.emitter = emitter;
this.error = new AtomicThrowable();
//一种简单的生产者与消费者队列
this.queue = new SpscLinkedArrayQueue<T>(16);
}
@Override
public void onNext(T t) {
if (emitter.isDisposed() || done) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() == 0 && compareAndSet(0, 1)) {
emitter.onNext(t);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<T> q = queue;
synchronized (q) {
q.offer(t);
}
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (emitter.isDisposed() || done) {
return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {
done = true;
drain();
return true;
}
return false;
}
@Override
public void onComplete() {
if (emitter.isDisposed() || done) {
return;
}
done = true;
drain();
}
void drain() {
//AutomicInteger 提供原子操作的线程安全的自增自减的Integer类
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {
ObservableEmitter<T> e = emitter;
SpscLinkedArrayQueue<T> q = queue;
AtomicThrowable error = this.error;
int missed = 1;
死循环操作,退出标志为
1. 发射器释放了,就清空队列
2. 出现错误一样操作,然后退出
3. 操作已经完成,或者队列为空就返回退出
否则一直循环发射消息
for (;;) {
for (;;) {
if (e.isDisposed()) {
q.clear();
return;
}
if (error.get() != null) {
q.clear();
e.onError(error.terminate());
return;
}
boolean d = done;
//轮询取出队列中的数据
T v = q.poll();
boolean empty = v == null;
if (d && empty) {
e.onComplete();
return;
}
if (empty) {
break;
}
e.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void setDisposable(Disposable s) {
emitter.setDisposable(s);
}
@Override
public void setCancellable(Cancellable c) {
emitter.setCancellable(c);
}
@Override
public boolean isDisposed() {
return emitter.isDisposed();
}
@Override
public ObservableEmitter<T> serialize() {
return this;
}
}
}
//获取当前的Disposable对象释放
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
线程调度处理
Schedulers.io
代码语言:javascript复制Schedulers类中
static final Scheduler IO;
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
//初始化子线程
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
IoScheduler类中初始化线程
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
RxThreadFactory类中创建线程
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
//判断是否创建非阻塞式线程,scheduler中nonBlocking为false
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
//设置线程优先级
t.setPriority(priority);
t.setDaemon(true);
return t;
}
static final class RxCustomThread extends Thread implements NonBlockingThread {
RxCustomThread(Runnable run, String name) {
super(run, name);
}
}
/**运行在Scheduler不推荐实现此接口
* Marker interface to indicate blocking is not recommended while running
* on a Scheduler with a thread type implementing it.
*/
public interface NonBlockingThread {
}
AndroidSchedulers.mainThread()事件消费线程在主线程
代码语言:javascript复制public final class AndroidSchedulers {
private static final class MainHolder {
//在主线程创建handler,并使用了静态内部类单例模式创建HandlerScheduler
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
observeOn()方法
observeOn()
方法 事件消费的线程即下游线程
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//变换ObservableObserveOn对象进行输出
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
//创建该对象
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//判断当前调度器是否是Trampoline调度器,是的就调用父类ObservableSource进行订阅观察者
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//不是创建工作调度器,如果在主线程调用就创建HandlerWorker
Scheduler.Worker w = scheduler.createWorker();
//调用父类ObservableSource订阅ObserveOnObserver,订阅上游数据源
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//创建ObserveOnObserver,当上游数据push下来时,会有ObserveOnObserver对应的onxx方法处理
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//上游观察者push的数据都是存储在这里
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
//代表同步或者异步发送
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
//该缓存获取的是Flowable中的,最大缓存为128,也就是最大任务数为128
//BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//判断当前是同步还是异步操作,如果都不是就使用单一的SpscLinkedArrayAueue队列,否则使用QueueDisposable
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
//观察者与Disposeable关联,Disposeable用来解除订阅者的
actual.onSubscribe(this);
return;
}
}
//单一的生产者与消费者队列,用于保存上游onNetpush的数据
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//开始调度
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
if (getAndIncrement() == 0) {
//工作调度器进行调度,如果在主线程操作,就发送消息,并且切换到主线程执行事件消费
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//死循环,队列为空或者观察者接收完消息
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}
observeOn总结
ObserveOnObserver
实现了Observer
与runnable
接口- 在
onNext
中先不切换线程,在数据加入到队列中,然后切换线程,在另一个线程中,从queue
中取出消息,然后push
给下游 - 所以
observeOn
是影响下游线程执行,多次调用依然生效 - 关于多次生效,对比
subscribeOn
换线程是在subscribeActual
中触发的,主动切换了上游线程,从而影响其发射数据所在线程, 而直到真正发射数据之前,所以subscribeOn
切换一次,但observeOn
是主动行为,并且切换线程会立刻发送数据,会生效多次。
subscribeOn()方法
subscribeOn()
指定事件产生线程,即上游线程
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存线程调度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//super简单保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//手动调用下游(终点)Observer.onSubscribe方法,所以onSubscribe执行在订阅线程
s.onSubscribe(parent);
//scheduler.scheduleDirect开启一个没有延迟的调度任务执行
//将子线程调度加入到Disposable管理类
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
//真正的下游终点观察者
final Observer<? super T> actual;
//读和写都是原子性的对象引用变量
//用于保存上游的disposable,以便在下游dispose时一起dispose
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
//该方法由上游调用,传入Disposable,在本类中将s赋值给this.s,加入管理
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
//直接调用下游观察者的方法
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
//取消订阅时,连同上游Disposable一起取消
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//这个方法在subscribeActual手动调用,将scheudlers中的worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//此时运行在相应的Scheduler线程
source.subscribe(parent);
}
}
}
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
//CAS,在Java并发应用中通常指CompareAndSwap或CompareAndSet,即比较并交换。
CAS是一个原子操作,它比较一个内存位置的值并且只有相等时修改这个内存位置的值为新的值,
保证了新的值总是基于最新的信息计算的,如果有其他线程在这期间修改了这个值则CAS失败。
CAS返回是否成功或者内存位置原来的值用于判断是否CAS成功。
这里比较当前AutomicReference中存储的值,如果为null就用d值替换空,整个操作原子性
if (!field.compareAndSet(null, d)) {
d.dispose();
//通过AtomicReference.get方法获取存储在AtomicReference中的引用,如果非范型就返回Object,否则返回指定类型
//引用
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建worker对象
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
subscribeOn总结
- 返回一个
ObservableSubscribeOn
包装对象 - 上一步方法被订阅后,调用
subscribeAutal
方法,立刻将线程切换到Schedulers.xx
线程中 - 在切换线程中,执行
source.subscribe(parent
)方法,对上游observable
订阅 - 上游开始发送数据,上游发送数据仅仅调用下游的
onNext
等方法,在切换线程中执行
线程调度总结
subscribeOn
指定subscribe()
所发生的线程,即事件产生的线程 ,影响它前面执行所在的线程,这里是Observable.create
中的subscribe
方法线程。observeOn
指定subscriber
所发生的线程,即事件消费线程,影响它后年执行所在的线程,这里是subscribe(new Observer)
方法中线程。
源码分析总结
- 在
subscribeActual()
方法中,源头和终点关联起来。 source.subscribe(parent)
;这句代码执行时,才开始从发送ObservableOnSubscribe
中利用ObservableEmitter
发送数据给Observer
。即数据是从源头push给终点的。CreateEmitter
中,只有Observable
和Observer
的关系没有被dispose
,才会回调Observer
的onXXXX()
方法Observer
的onComplete()
和onError()
互斥只能执行一次,因为CreateEmitter
在回调他们两中任意一个后,都会自动dispose()
。根据上一点,验证此结论。- 先
error
后complete
,complete
不显示。 反之会crash
- 要注意的是
onSubscribe()
是在我们执行subscribe()
这句代码的那个线程回调的,并不受线程调度影响。