RxJS 入门到搬砖 之 Scheduler

2023-05-17 20:02:44 浏览数 (3)

# Scheduler

什么是 Scheduler ? scheduler 控制 subscription 什么时候开始和通知什么时候派发。

  • scheduler 是一个数据结构,知道如何根据优先级或其他标准对任务进行存储和排序;
  • scheduler 是一个执行上下文,表示任务在何时何地执行(如立即执行、或在另一个回调机制中,如 setTimeoutprecess.nextTick 或动画帧 );
  • scheduler 有一个时钟,通过 schedulernow() 方法提供了“时间”的概念,在特定调度程序上调度的任务将仅遵守该时钟指示的时间;

Scheduler 支持开发者定义 Observable 将在什么执行上下文中向其 Observer 传递通知。

代码语言:javascript复制
import { Observable, observeOn, asyncScheduler } from 'rxjs';

const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
}).pipe(observeOn(asyncScheduler));

console.log('just before subscribe');

observable.subscribe(
  (x) => console.log('got value '   x),
  (err) => console.error('something wrong occurred: '   err),
  () => console.log('done')
);

console.log('just after subscribe');

// just before subscribe
// just after subscribe
// got value 1
// got value 2
// got value 3
// done

observeOn(asyncScheduler) 在新的 Observable 和最终的 Observer 之间引入了一个代理 Observer

代码语言:javascript复制
import { Observable, observeOn, asyncScheduler } from 'rxjs';

const observable = new Observable((proxyObserver) => {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
}).pipe(
  observeOn(asyncScheduler)
);

const finalObserver = {
  next: (x) => console.log('got value '   x),
  error: (err) => console.error('something wrong occurred: '   err),
  complete: () => console.log('done')
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

// just after subscription
// just before subscribe
// got value 1
// got value 2
// got value 3
// done

proxyObserver 是在 observeOn(asyncScheduler) 中创建的,它的 next(val) 函数可以理解成:

代码语言:javascript复制
const proxyObserver = {
  next (val) {
    asyncScheduler.subscribe(
      (x) => finalObserver.next(x),
      0, // delay
      val // value
    );
  }
};

async Scheduler 使用 setTimeoutsetInterval 运行,及时给定的延迟为 0。在 JavaScript 中,setTimeout(fn, 0) 在下次事件循环迭代中最早运行函数 fn

Schedulerschedule() 方法接受一个延迟参数,它指的是相对于 Scheduler 自己的内部时钟的时间量。Scheduler 的时钟不需要与实际的时间有关,就像延迟操作的时间不是在实际时间上运行的,而是在 Scheduler 的时钟上运行的。这在测试中特别有用,其中可以使用虚拟时间 Scheduler 来伪造现实时间,而实际上是同步执行计划任务。

# Scheduler Types

async Scheduler 是 RxJS 内置的 scheduler 之一。其他一些 scheduler 都可以通过使用 Scheduler 对象的静态属性来创建。

SCHEDULER

PURPOSE

null

不传入任何 scheduler 时,通知以同步和递归方式传递。这用于恒定时间操作或尾递归操作

queueScheduler

在当前事件框架中的队列上调度,用于迭代操作

asapScheduler

在微任务队列进行调度,就是 Promise 使用的队列。通常是当前工作结束后,下个工作开始前。用于异步转换

asyncScheduler

使用 setInterval 完成调度,用于基于时间的操作

animationFrameScheduler

调度将在下一次浏览器内容重绘之前发生的任务。可用于创建流畅的浏览器动画

# Using Schedulers

你可能已经在 RxJS 代码中使用了调度器,而没有明确说明要使用的调度器的类型。这是因为所有处理并发的 Observable 操作符都有可选的调度器。如果你没有提供调度器,RxJS 会根据最小并发的原则选择一个默认的调度器。也就是说会选择引入满足 operator 需求的最少并发的调度器。

如,对于返回有限或少量信息 observableoperator , RxJS 不使用 Scheduler,即 nullundefined。对于返回可能大量或无限数量的消息的 operator ,RxJS 会使用 queueScheduler。对于使用计时器的 operator , RxJS 会使用 asyncScheduler

因为 RxJS 使用最小并发量的 scheduler,所以如果出于性能目的引入并发,可以选择一个不同的 scheduler。要指定特定的 scheduler,可以使用采用 scheduleroperator 方法。如,from([10, 20, 30], asyncScheduler)

静态创建操作符通常以 Scheduler 作为参数。 如,from(array, scheduler) 允许你指定在传递从数组转换的每个通知时要使用的调度程序。它通常是操作符的最后一个参数,下面静态创建操作符接受 Scheduler 的参数:

  • bindCallback
  • bindNodeCallback
  • conbineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

使用 subscribeOn 来调度 subscribe() 调用将来在什么上下文中发生。 默认,对 Observablesubscribe() 调用将同步并立即发生。不过,可以使用实例运算符 subscribeOn(scheduler) 延迟或安排在给定 Scheduler 上发生的实际订阅,其中 scheduler 是你提供的参数。

使用 observeOn 来处理在什么上下文中发送通知。 就像上面例子中的,实例操作符 observeOn(scheduler) 在源 Observable 和目标 Observer 之间引入了一个中介 Observer,其中中介使用给定的 scheduler 调度对目标 Observer 的调用。

实例操作符可以将 Scheduler 作为参数。

时间相关的操作符,如 bufferTimedebounceTimedelayauditTimesampleTimethrottleTimetimeIntervaltimeouttimeoutWithwindowTime 都会接受一个 Scheduler 作为最后一个参数,否则默认在 asyncScheduler 上运行。

其他将 Scheduler 作为参数的实例操作符:cachecombineLatestconcatexpandmergepublishReplaystartWith

0 人点赞