RxJS 入门到搬砖 之 Observable 和 Observer

2023-05-17 20:34:05 浏览数 (2)

# Observable

Observable 是多个值的惰性 Push 集合。他填补了下表中的缺失点:

SINGLE

MULTIPLEXED

Pull

Function

Iterator

Push

Promise

Observable

如,下面是一个 Observable,它在订阅时立即(同步)推送值 123,并且从 subscribe 调用开始后过 1 s 再推送值 4,然后结束。

代码语言:javascript复制
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

要调用 Observable 并查看这些值,我们需要订阅它:

代码语言:javascript复制
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

console.log('Before subscribe');
observable.subscribe({
  next(x) { console.log('Next: '   x); },
  error(err) { console.error('Error: '   err); },
  complete() { console.log('Complete'); }
});
console.log('After subscribe');

// Before subscribe
// Next: 1
// Next: 2
// Next: 3
// After subscribe
// Next: 4
// Complete 

# Pull vs Push

PullPush 是两种不同的协议,描述了数据生产者和数据消费者如何进通信。

什么是 Pull? 在 Pull 系统中,消费者决定什么时候从数据生产者中接收数据。数据生产者自己对什么时候数据被传递到消费者没有感知。

每个 JavaScript 函数都是一个 Pull 系统。函数是数据的生产者,调用函数的代码通过从其调用中 pull 出单个返回值来使用它。

ES 2015 中介绍了生成器函数和迭代器 (opens new window)(function *),也属于 Pull 系统。调用 iterator.next() 的代码是消费者,从迭代器(生产者)中拉出多个值。

PRODUCER

CONSUMER

Pull

Passive:produces data when requested

Active:decides when data is requested

Push

Active:produces data at its own pace

Passive:reacts to received data

什么是 Push ? 在 Push 系统中,生产者决定什么时候推送数据给消费者。数据消费者自己对什么时候数据被接收到没有感知。

Promise 是目前 JavaScript 中最常见的 Push 系统类型。Promise (生产者)传递一个 resolved 的值给注册的回调(消费者),不过和函数不一样,Promise 自己负责精准确定该值何时 push 到回调。

RxJS 引入了 Observable,一个新的 JavaScript Push 系统。Observable 是一个多值生产者,推送数据给 Observer(消费者)。

  • 函数是一种惰性求值计算,在调用时同步返回单个的值。
  • 生成器是一种惰性求值计算,在迭代时同步返回 0 个或到可能无限多个值。
  • Promise是一种可能(或可能不会)最终返回单个值的计算。
  • Observable是一种惰性求值计算,从调用时起可以同步或异步地返回 0 个或到可能无限多个值。

# Observables as generalizations of functions

Observable 不像 EventEmitter 也不像 Promise 用于多个值。在一些情况下 Observable 会表现地像 EventEmitter,如当使用 RxJS 的 Subject 进行多播时,但通常它们的行为不像 EventEmitter

Observable 类似于零参数的函数,但将它们泛化为允许多个值。

代码语言:javascript复制
function foo () {
  console.log('Hello');
  return 42;
}

const x = foo.call(); // same as foo()
console.log(x);
// Hello
// 42

const y = foo.call(); // same as foo()
console.log(y); 
// Hello
// 42

使用 Observable 改写上面的代码:

代码语言:javascript复制
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
});

foo.subscribe(x => {
  console.log(x);
});
// Hello
// 42

foo.subscribe(y => {
  console.log(y);
});
// Hello
// 42

因为 函数 和 Observable 都是惰性计算。如果你不调用函数,console.log('Hello') 就不会被执行。同样对于 Observable,如果你不“调用”它(使用 subscribe), console.log('Hello') 也不会被执行。另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。和 EventEmitter 共享副作用并且无论订阅者是否存在都立即触发相反,Observable 没有共享执行并且是惰性计算。

订阅一个 Observable 就是调用一个函数。

部分人觉得 Observable 是异步的,这并不是真的。

代码语言:javascript复制
console.log('before');
console.log(foo.call());
console.log('after');
// before
// Hello
// 42
// after

使用 Observable 会观察到和函数一样的输出:

代码语言:javascript复制
console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');
// before
// Hello
// 42
// after

这说明,对 foo 的订阅完全是同步的,就像一个函数一样。

Observable 既能同步也可以异步地传递值。

Observable 和函数之间的区别是什么?Observable 可以随着时间推移“返回”多个值,这是函数无法做到的。

代码语言:javascript复制
function foo () {
  console.log('Hello');
  return 42;
  return 100; // dead code, will never happen
}

函数只能返回一个值,而 Observable 可以返回多个值:

代码语言:javascript复制
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
});

console.log('Before');
foo.subscribe(x => {
  console.log(x);
});
console.log('After');

// Before
// Hello
// 42
// 100
// 200
// After

也可以异步地返回值:

代码语言:javascript复制
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300);
  }, 1000);
});

console.log('Before');
foo.subscribe(x => {
  console.log(x);
});
console.log('After');

// Before
// Hello
// 42
// 100
// 200
// After
// 300

结论:

  • func.call() 表示同步地返回一个值
  • observable.subscribe() 表示同步或异步地返回 0 或多个值

# Anatomy of an Observable

Observable 使用 new Observable 或一个创建操作符来 created,会被 Observer subscribedexecute 来向 Observer 传递 next / error / complete 通知,并且他们的执行可能会被 disposed。这四个方面都编码字在 Observable 实例中,当其中一些与其他类型相关,如 ObserverSubscription

Observable 核心关注点:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables
# Creating Observables

Observable 构造函数接受一个参数:subscribe 函数

代码语言:javascript复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
});

Observable 可以使用 new Observable 来创建。通常,Observable 使用创建函数如 offrominterval 等来创建。

# Subscribing to Observables
代码语言:javascript复制
observable.subscribe(x => {
  console.log(x);
});

这不是个巧合,observable.subscribenew Observable(function subscribe(subscriber) {})subscribe 有相同的名字。在库中,它们是不一样的,不过在实践中可以认为它们在概念上是一样的。

这表示订阅调用不会在同一个 Observable 的多个 Observer 之间共享。当使用 Observer 调用 observable.subscribe 时,new Observable(function subscribe(subscriber) {}) 中的 subscribe 函数为给定的 subscriber 运行。对 observable.subscribe 的每次调用都会为给定的 subscriber 触发其对应的设置。

对于 Observable 的订阅就像调用一个函数,提供了可以传递数据的回调。

这和 addEventListener / removeEventListener 等事件处理程序 API 完全不同。使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为监听器。Observable 甚至不维护一个 Observer 列表。

订阅调用只是一种启动 Observable 执行并将值或时间传递给该执行的 Observer 的方法。

# Executing Observables

new Observable(function subscribe(subscriber) {}) 里面的代码表示 Observable 的执行,只发生在每个订阅的 Observer 上的惰性计算。执行会随着时间的推移,同步或异步地产生多个值。

Observable 执行可以传递的值类型:

  • Next 通知:发送一个值,如 NumberStringObject
  • Error 通知:发送一个错误,如 Error
  • Complete 通知:不发送值

Next 通知时最重要也是最常见的类型:它表示发送给订阅者的实际数据。ErrorComplete 通知在 Observable 执行过程中只可能执行一次,并且只能有一个发生。

代码语言:javascript复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 严格遵守协议,在 Complete 通知之后的 Next 通知将不会被发送:

代码语言:javascript复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Is not delivered to subscribers
});

可以在 subscribe 代码外包一层 try/catch 块,以捕获错误:

代码语言:javascript复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err);
  }
});

# Disposing Observables Executions

因为 Observable 执行可能是无限的,但是对于 Observer 来说在有限时间内结束执行时常见的需求,因此需要有取消执行的 API。因为每次执行只针对一个 Observer,一旦 Observer 接收到数据,它需要有方法去停止执行,不然会造成计算资源和内存的浪费。

observable.subscribe 被调用时,Observer 被附加到新创建的 Observable 执行中,该调用还会返回 Subscription 对象。

代码语言:javascript复制
const subscription = observable.subscribe(x => console.log(x));

Subscription (opens new window) 表示正在进行的执行,它具有允许你取消该执行的最小 API。

代码语言:javascript复制
import { from } from 'rxjs';

const observable = from([1, 2, 3]);
const subscription = observable.subscribe(x => console.log(x));

// Later
subscription.unsubscribe();

当我们使用 create() 创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源,如可以在函数 subscribe() 中返回自定义取消订阅函数来实现。

代码语言:javascript复制
const observable = new Observable(function subscribe (subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next(Math.random());
  }, 1000);

  return function unsubscribe () {
    clearInterval(intervalId);
  };
});

就像 observable.subscribe 类似于 new Observable(function subscribe (subscriber) {}), 我们从 subscribe 返回的 unsubscribe 在概念上等同于 subscription.unsubscribe。如果移除围绕在这些概念周围的 ReactiveX 类型,留下的就是原生的 JavaScript。

代码语言:javascript复制
function subscribe (subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next(Math.random());
  }, 1000);

  return function unsubscribe () {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({
  next: x => console.log(x),
  error: err => console.error(err),
  complete: () => console.log('completed')
});

// Later
unsubscribe();

之所以使用像 ObservableObserverSubscription 的 Rx 类型,是为了安全考虑和 Operator 的可组合性。

# Observer

什么是 Observer Observer 作为消费者消费 Observable 派发的值。Observer 只是一组回调,用于 Observable 派发的每种类型的通知:next, errorcomplete

代码语言:javascript复制
const observer = {
  next: value => console.log(`Observer got a next value: ${value}`),
  error: error => console.error(`Observer got an error: ${error}`),
  complete: () => console.log('Observer got a complete notification')
};

// 通过将 observer 对象传递给 `subscribe`,来订阅 observable
observable.subscribe(observer);

Observer 只是有三个回调的对象,用于 Observable 可能派发每种类型的通知。

RxJS 中的 Observer 也可能是部分的。如果没有提供某种回调,Observable 也会正常执行,只不过一些类型的通知会被忽略,因为他们在 Observer 中找不到对应的回调。

代码语言:javascript复制
const observer = {
  next: value => console.log(`Observer got a next value: ${value}`),
  error: error => console.error(`Observer got an error: ${error}`)
};

在订阅 Observable 时,也可以不用将回调放在一个 Observer 对象中,只传一个 next 回调函数作为参数就可以。

代码语言:javascript复制
observable.subscribe(value => console.log(`Observer got a next value: ${value}`));

observable.subscribe 内部,将使用参数中的回调函数作为下一个处理程序创建一个 Observer 对象。

0 人点赞