RxJava之timer和interval操作符源码解析

2022-12-19 13:22:01 浏览数 (2)

转载请以链接形式标明出处: 本文出自:103style的博客

timer 操作符

timer 操作符实际上返回的是一个 ObservableTimer对象。两个参数的方法默认在 Schedulers.computation()中工作。

代码语言:javascript复制
 public static Observable<Long> timer(long delay, TimeUnit unit) {
     return timer(delay, unit, Schedulers.computation());
 }
 public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
     return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
 }

ObservableTimer 源码:

  • 构建了 TimerObserver 对象。
  • 执行 观察者 的 onSubscribe 方法。
  • 通过scheduler.scheduleDirect(ios, delay, unit) 返回一个 Disposable 对象。
  • 将返回的 Disposable 对象传给 TimerObserver 对象的 setResource 方法
代码语言:javascript复制
public final class ObservableTimer extends Observable<Long> {
    final Scheduler scheduler;
    final long delay;
    final TimeUnit unit;
    public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        TimerObserver ios = new TimerObserver(observer);
        observer.onSubscribe(ios);
        Disposable d = scheduler.scheduleDirect(ios, delay, unit);
        ios.setResource(d);
    }
    ...
}

TimerObserver对象源码:

代码语言:javascript复制
static final class TimerObserver extends AtomicReference<Disposable>
implements Disposable, Runnable {

    final Observer<? super Long> downstream;

    TimerObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }
    ...
    @Override
    public void run() {
        if (!isDisposed()) {
            downstream.onNext(0L);
            lazySet(EmptyDisposable.INSTANCE);
            downstream.onComplete();
        }
    }

    public void setResource(Disposable d) {
        DisposableHelper.trySet(this, d);
    }
}

首先看 TimerObserversetResource(Disposable d)方法 里的 DisposableHelper.trySet(this, d);

代码语言:javascript复制
public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
    if (!field.compareAndSet(null, d)) {
        if (field.get() == DISPOSED) {
            d.dispose();
        }
        return false;
    }
    return true;
}
  • d 不为 null,直接 return true;否则判断 是否为 DISPOSED 状态,是的话调用传进来的 Disposable 对象(也就是之前 Scheduler 构建的 DisposeTask 对象)的 dispose 方法。

scheduler.scheduleDirect(ios, delay, unit) 方法:

代码语言:javascript复制
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

首先创建了一个 Worker,因为默认是 Schedulers.computation()中工作,查看源码可知 实际调用的是 ComputationSchedulercreateWorker 方法 。 Schedulers

代码语言:javascript复制
...
static final class ComputationHolder {
    static final Scheduler DEFAULT = new ComputationScheduler();
}
...
static {
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    ...
}

static final class ComputationTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return ComputationHolder.DEFAULT;
    }
}

RxJavaPlugins

代码语言:javascript复制
public static Scheduler initComputationScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitComputationHandler;
    if (f == null) {
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
}

static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
    try {
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

f 默认为 null,所以返回的是 callRequireNonNull(defaultScheduler),然后实际调用的是 ComputationTaskcall 方法。返回的即为 ComputationScheduler 对象。

ComputationSchedulercreateWorker 方法 。

代码语言:javascript复制
public ComputationScheduler() {
    this(THREAD_FACTORY);
}

public ComputationScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
    start();
}

public Worker createWorker() {
    return new EventLoopWorker(pool.get().getEventLoop());
}

pool.get()通过构造函数我们可知返回的为 NONE = new FixedSchedulerPool(0, THREAD_FACTORY); 所以 pool.get().getEventLoop() 返回的为 SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown"));。实际上是创建了一个 executorExecutors.newScheduledThreadPool(1, factory) ,即 factoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象

代码语言:javascript复制
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
    this.cores = maxThreads;
    this.eventLoops = new PoolWorker[maxThreads];
    for (int i = 0; i < maxThreads; i  ) {
        this.eventLoops[i] = new PoolWorker(threadFactory);
    }
}
public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) {
        return SHUTDOWN_WORKER;
    }
    return eventLoops[(int)(n   % c)];
}

所以createWorker 返回的是:poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象

代码语言:javascript复制
static final class EventLoopWorker extends Scheduler.Worker {
    private final ListCompositeDisposable serial;
    private final CompositeDisposable timed;
    private final ListCompositeDisposable both;
    private final PoolWorker poolWorker;

    volatile boolean disposed;

    EventLoopWorker(PoolWorker poolWorker) {
        this.poolWorker = poolWorker;
        this.serial = new ListCompositeDisposable();
        this.timed = new CompositeDisposable();
        this.both = new ListCompositeDisposable();
        this.both.add(serial);
        this.both.add(timed);
    }
...

decoratedRun 即为 TimerObserver 对象。

代码语言:javascript复制
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

然后构建了一个 DisposeTask 对象。

代码语言:javascript复制
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    final Runnable decoratedRun;
    final Worker w;
    Thread runner;

    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            decoratedRun.run();
        } finally {
            dispose();
            runner = null;
        }
    }
    ...
}

createWorker 返回的 poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象,并执行 schedule 方法。 实际上是执行了 单线程线程池对象 Executors.newScheduledThreadPool(1, factory)schedule(task, delayTime, unit)方法,并将返回值 Future 对象 传给ScheduledRunnablesetFuture 方法。

代码语言:javascript复制
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (disposed) {
        return EmptyDisposable.INSTANCE;
    }
    return scheduleActual(action, delayTime, unit, null);
}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    ...
    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        ...
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

线程池的schedule(task, delayTime, unit) 方法实际时延时 delayTime 执行 taskrun 方法。即为 执行 TimerObserver 对象的 run 方法。

代码语言:javascript复制
public void subscribeActual(Observer<? super Long> observer) {
    TimerObserver ios = new TimerObserver(observer);
    observer.onSubscribe(ios);
    Disposable d = scheduler.scheduleDirect(ios, delay, unit);
    ios.setResource(d);
}

TimerObserver 对象的 run 方法: 即执行了 观察者onNext(0L)onComplete()

代码语言:javascript复制
public void run() {
    if (!isDisposed()) {
        downstream.onNext(0L);
        lazySet(EmptyDisposable.INSTANCE);
        downstream.onComplete();
    }
}

interval 系列操作符
  • interval系列 包含 intervalintervalRange两个操作符,包含以下 6 个方法:
    • interval(long period, TimeUnit unit)
    • interval(long initialDelay, long period, TimeUnit unit)
    • interval(long period, TimeUnit unit, Scheduler scheduler)
    • interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

    分别返回的是 ObservableIntervalObservableIntervalRange 对象,默认的 SchedulerSchedulers.computation()

代码语言:javascript复制
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
    return interval(initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
    return intervalRange(start, count, initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

ObservableInterval 源码:

  • 构建了 IntervalObserver 对象。
  • 因为默认Schedulers.computation() 所以 sch instanceof TrampolineScheduler不成立,除非我们手动传参 SchedulerSchedulers.trampoline()
  • 和前面的 ObservableTimer类似, 即为调用 ObservableIntervalrun 方法。只是返回的为PeriodicDirectTask对象。
  • setResourceObservableTimer类似,就不再赘述了。
代码语言:javascript复制
public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is);

        Scheduler sch = scheduler;
        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }
    ...
}

sch.schedulePeriodicallyDirect(is, initialDelay, period, unit) 实际调用的为 schedulePeriodically方法:

  • interval 的间隔时间转化为 Nanoseconds
  • 然后设置 第一次的 响应时间为 当前时间 间隔时间 的 纳秒数。
  • 里面又将 PeriodicDirectTask对象 包装成 PeriodicTask 对象。
代码语言:javascript复制
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
    Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
    if (d == EmptyDisposable.INSTANCE) {
        return d;
    }
    return periodicTask;
}

public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
    final SequentialDisposable first = new SequentialDisposable();
    final SequentialDisposable sd = new SequentialDisposable(first);
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    final long periodInNanoseconds = unit.toNanos(period);
    final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
    final long firstStartInNanoseconds = firstNowNanoseconds   unit.toNanos(initialDelay);

    Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
            periodInNanoseconds), initialDelay, unit);
    if (d == EmptyDisposable.INSTANCE) {
        return d;
    }
    first.replace(d);
    return sd;
}

PeriodicTask 对象的 run 方法

  • decoratedRun.run(); 又调用了 PeriodicDirectTask对象的 run 方法.
  • run 方法的最后 sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); 这里又 重复执行 这个任务,直到 IntervalObserver对象 isDisposed()true
代码语言:javascript复制
 final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
    final Runnable decoratedRun;
    final SequentialDisposable sd;
    final long periodInNanoseconds;
    long count;
    long lastNowNanoseconds;
    long startInNanoseconds;

    PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
            long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
        this.decoratedRun = decoratedRun;
        this.sd = sd;
        this.periodInNanoseconds = periodInNanoseconds;
        lastNowNanoseconds = firstNowNanoseconds;
        startInNanoseconds = firstStartInNanoseconds;
    }

    @Override
    public void run() {
        decoratedRun.run();
        if (!sd.isDisposed()) {
            long nextTick;
            long nowNanoseconds = now(TimeUnit.NANOSECONDS);
            // If the clock moved in a direction quite a bit, rebase the repetition period
            if (nowNanoseconds   CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                    || nowNanoseconds >= lastNowNanoseconds   periodInNanoseconds   CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                nextTick = nowNanoseconds   periodInNanoseconds;
                /*
                 * Shift the start point back by the drift as if the whole thing
                 * started count periods ago.
                 */
                startInNanoseconds = nextTick - (periodInNanoseconds * (  count));
            } else {
                nextTick = startInNanoseconds   (  count * periodInNanoseconds);
            }
            lastNowNanoseconds = nowNanoseconds;

            long delay = nextTick - nowNanoseconds;
            sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
        }
    }
    ...
}

PeriodicDirectTaskrun 方法:

  • 实际调用的即为IntervalObserverrun()
代码语言:javascript复制
static final class PeriodicDirectTask
implements Disposable, Runnable, SchedulerRunnableIntrospection {
    final Runnable run;
    final Worker worker;
    volatile boolean disposed;

    PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
        this.run = run;
        this.worker = worker;
    }

    @Override
    public void run() {
        if (!disposed) {
            try {
                run.run();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                worker.dispose();
                throw ExceptionHelper.wrapOrThrow(ex);
            }
        }
    }
    ...
}

IntervalObserverrun()

  • 调用 观察者onNext 方法
代码语言:javascript复制
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
    final Observer<? super Long> downstream;
    long count;

    IntervalObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }

    @Override
    public void run() {
        if (get() != DisposableHelper.DISPOSED) {
            downstream.onNext(count  );
        }
    }
}

然后直到我们直接调用 dispose() 方法结束流程。

以上

0 人点赞