RxJS 入门到搬砖 之 Subscription 和 Subject

2023-05-17 20:03:12 浏览数 (4)

# Subscription

什么是 Subscription? Subscription 是一个表示一次性资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法 unsubscribe,不接受任何参数,只是释放 Subcription 持有的资源。在之前的 RxJS 中,Subscription 被称为 Disposable

代码语言:javascript复制
import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));

subscription.unsubscribe();

Subscription 本质上只有一个 unsubscribe() 函数来释放资源或取消 Observable 执行。

Subscription 也可以放在一起,这样调用一个 Subscriptionunsubscribe() 能取消多个 Subscription

代码语言:javascript复制
import { interval } from 'rxjs';

const observable1 = interval(400);
const observable2 = interval(300);

const subscription = observable1.subscribe(x => console.log('first: '   x));
const childSubsciption = observable2.subscribe(x => console.log('second: '   x));

subscription.add(childSubsciption);

setTimeout(() => {
  subscription.unsubscribe();
}, 1000);

// second: 0
// first: 0
// second: 1
// first: 1
// second: 2

Subscription 还有个 remove(otherSubscription) 方法,用于撤销添加到 Subscription 的子 Subscription

# Subjects

什么是 Subject ? RxJS 中的 Subject 是一种特殊类型的 Observable,它允许将值多播到多个 Observer。虽然普通的 Observable 是单播的(每个订阅的 Observer 都拥有 Observable 的独立执行),但 Subject 可以多播。

Subject 类似 Observable,但是它可以多播给多个 ObserverSubject 有点像 EventEmitter:他们都维护多个监听这的注册。

每个 Subject 都是一个 Observable 给定一个 Subject,可以订阅它,使用 Observer 开始正常接收值。从 Observer 角度来看,它无法判断 Observable 的执行时来自普通的单播 Observable 还是 Subject

Subject 内部,订阅不会调用传递至的新执行。它只是在一个 Observer 列表中注册给定的 Observer,类似于其他库或语言中 addListener 的工作方式。

每个 Subject 都是一个 Observer 它是一个对象,有 next(v)error(e)complete() 方法。要为 Subject 提供一个新值,只需调用 next(v),它将被多播到注册监听 SubjectObserver

代码语言:javascript复制
import { Subject } from 'rxjs';

const subject = new Subject();

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});

subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(1);
subject.next(2);

// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

因为 Subject 是一个 Observer ,也就是说可以使用 Subject 作为参数来订阅任何 Observable

代码语言:javascript复制
import { Subject, from } from 'rxjs';

const subject = new Subject();

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});
subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

const observable = from([1, 2, 3]);

observable.subscribe(subject);

// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

通过上面的方法,我们基本上只是通过 Subject 将单播 Observable 执行转换为多播。这是 Subject 如何使任何 Observable 执行共享给多个 Observer 的唯一方法。

也有一些特殊的 SubjectBehaviorSubjectReplaySubjectAsyncSubject

# Multicasted Observables

“多播 Observable” 通过可能有许多订阅者的 Subject 传递通知,而普通的 “单播 Observable” 仅向单个 Observer 发送通知。

多播的 Observable 在底层使用 Subject 来让多个 Observer 看到相同的 Observable 执行。

多播操作符底层工作原理:Observer 订阅底层 SubjectSubject 订阅源 Observable

代码语言:javascript复制
import { from, Subject, multicast } from 'rxjs';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

multicasted.subscribe({
  next: (v) => console.log('observerA: '   v)
});

multicasted.subscribe({
  next: (v) => console.log('observerB: '   v)
});

multicasted.connect();

multicast 返回一个看起来像正常 ObservableObservable,但是它在订阅时像 Subject 一样。multicast 返回一个 ConnectableObservable,它是个有 connect() 方法的 Observable

connect() 方法决定共享的 Observable 具体什么时候开始执行。connect() 本质上是执行 source.subscribe(subject)coonect() 返回一个 Subscription,它可以用来取消订阅。

# BehaviorSubject

BehaviorSubjectSubject 的变体之一,具有“当前值”的概念。它存储发送给其消费者最新的值,并且每当有新的 Observer 订阅时,它将立即接收来自 BehaviorSubject 的 “当前值”。

BehaviorSubject 对于表示 “随时间变化的值” 很有用。如,生日的事件流是一个 Subject,但一个人的年龄是 BehaviorSubject

代码语言:javascript复制
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(0); // 初始值为 0

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(3);

// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

# ReplaySubject

ReplaySubjectBehaviorSubject 类似,但它可以给新的订阅者发送旧的值,可以记录 Observable 执行。

ReplaySubject 记录 Observable 执行的一些值,并对新的订阅者进行重放。

代码语言:javascript复制
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(3); // 缓存 3 个值

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(5);

// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了缓冲个数外,还可以定义毫秒级的窗口时间,来决定缓存记录可以保留多久。

代码语言:javascript复制
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(100, 500); // 缓存 100 个值,每 500 毫秒清除一次

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});

let i = 1;
const interval = setInterval(() => subject.next(i  ), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: '   v)
  });
}, 1000);

// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

# AsyncSubject

AsyncSubject 也是一种变体,它只将 Observable 执行的最后一个值发送给它的观察者,并且仅在执行完成时发送。

代码语言:javascript复制
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(5);
subject.complete();

// observerA: 5
// observerB: 5

# Void subject

代码语言:javascript复制
import { Subject } from 'rxjs';

const subject = new Subject();

subject.subscribe({
  next: () => console.log('One second has passed')
});

setTimeout(() => {
  subject.next();
}, 1000);

// One second has passed

0 人点赞