# Observable
Observable
是多个值的惰性 Push 集合。他填补了下表中的缺失点:
SINGLE | MULTIPLEXED | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
如,下面是一个 Observable
,它在订阅时立即(同步)推送值 1
、2
、3
,并且从 subscribe
调用开始后过 1 s 再推送值 4
,然后结束。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
要调用 Observable
并查看这些值,我们需要订阅它:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('Before subscribe');
observable.subscribe({
next(x) { console.log('Next: ' x); },
error(err) { console.error('Error: ' err); },
complete() { console.log('Complete'); }
});
console.log('After subscribe');
// Before subscribe
// Next: 1
// Next: 2
// Next: 3
// After subscribe
// Next: 4
// Complete
# Pull vs Push
Pull
和 Push
是两种不同的协议,描述了数据生产者和数据消费者如何进通信。
什么是 Pull? 在 Pull 系统中,消费者决定什么时候从数据生产者中接收数据。数据生产者自己对什么时候数据被传递到消费者没有感知。
每个 JavaScript 函数都是一个 Pull 系统。函数是数据的生产者,调用函数的代码通过从其调用中 pull
出单个返回值来使用它。
ES 2015 中介绍了生成器函数和迭代器 (opens new window)(function *
),也属于 Pull 系统。调用 iterator.next()
的代码是消费者,从迭代器(生产者)中拉出多个值。
PRODUCER | CONSUMER | |
---|---|---|
Pull | Passive:produces data when requested | Active:decides when data is requested |
Push | Active:produces data at its own pace | Passive:reacts to received data |
什么是 Push ? 在 Push 系统中,生产者决定什么时候推送数据给消费者。数据消费者自己对什么时候数据被接收到没有感知。
Promise
是目前 JavaScript 中最常见的 Push 系统类型。Promise
(生产者)传递一个 resolved 的值给注册的回调(消费者),不过和函数不一样,Promise 自己负责精准确定该值何时 push 到回调。
RxJS 引入了 Observable
,一个新的 JavaScript Push 系统。Observable
是一个多值生产者,推送数据给 Observer
(消费者)。
- 函数是一种惰性求值计算,在调用时同步返回单个的值。
- 生成器是一种惰性求值计算,在迭代时同步返回 0 个或到可能无限多个值。
- Promise是一种可能(或可能不会)最终返回单个值的计算。
- Observable是一种惰性求值计算,从调用时起可以同步或异步地返回 0 个或到可能无限多个值。
# Observables as generalizations of functions
Observable
不像 EventEmitter
也不像 Promise
用于多个值。在一些情况下 Observable
会表现地像 EventEmitter
,如当使用 RxJS 的 Subject
进行多播时,但通常它们的行为不像 EventEmitter
。
代码语言:javascript复制Observable 类似于零参数的函数,但将它们泛化为允许多个值。
function foo () {
console.log('Hello');
return 42;
}
const x = foo.call(); // same as foo()
console.log(x);
// Hello
// 42
const y = foo.call(); // same as foo()
console.log(y);
// Hello
// 42
使用 Observable
改写上面的代码:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe(x => {
console.log(x);
});
// Hello
// 42
foo.subscribe(y => {
console.log(y);
});
// Hello
// 42
因为 函数 和 Observable
都是惰性计算。如果你不调用函数,console.log('Hello')
就不会被执行。同样对于 Observable
,如果你不“调用”它(使用 subscribe
), console.log('Hello')
也不会被执行。另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable
订阅触发两个单独的副作用。和 EventEmitter
共享副作用并且无论订阅者是否存在都立即触发相反,Observable
没有共享执行并且是惰性计算。
订阅一个 Observable 就是调用一个函数。
部分人觉得 Observable
是异步的,这并不是真的。
console.log('before');
console.log(foo.call());
console.log('after');
// before
// Hello
// 42
// after
使用 Observable
会观察到和函数一样的输出:
console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');
// before
// Hello
// 42
// after
这说明,对 foo
的订阅完全是同步的,就像一个函数一样。
Observable
既能同步也可以异步地传递值。
那 Observable
和函数之间的区别是什么?Observable
可以随着时间推移“返回”多个值,这是函数无法做到的。
function foo () {
console.log('Hello');
return 42;
return 100; // dead code, will never happen
}
函数只能返回一个值,而 Observable
可以返回多个值:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
也可以异步地返回值:
代码语言:javascript复制import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300);
}, 1000);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
// 300
结论:
func.call()
表示同步地返回一个值observable.subscribe()
表示同步或异步地返回 0 或多个值
# Anatomy of an Observable
Observable
使用 new Observable
或一个创建操作符来 created,会被 Observer
subscribed,execute 来向 Observer
传递 next
/ error
/ complete
通知,并且他们的执行可能会被 disposed。这四个方面都编码字在 Observable
实例中,当其中一些与其他类型相关,如 Observer
和 Subscription
。
Observable
核心关注点:
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Disposing Observables
# Creating Observables
Observable
构造函数接受一个参数:subscribe
函数
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
Observable 可以使用
new Observable
来创建。通常,Observable
使用创建函数如of
、from
、interval
等来创建。
# Subscribing to Observables
代码语言:javascript复制observable.subscribe(x => {
console.log(x);
});
这不是个巧合,observable.subscribe
和 new Observable(function subscribe(subscriber) {})
的 subscribe
有相同的名字。在库中,它们是不一样的,不过在实践中可以认为它们在概念上是一样的。
这表示订阅调用不会在同一个 Observable
的多个 Observer
之间共享。当使用 Observer
调用 observable.subscribe
时,new Observable(function subscribe(subscriber) {})
中的 subscribe
函数为给定的 subscriber
运行。对 observable.subscribe
的每次调用都会为给定的 subscriber
触发其对应的设置。
对于
Observable
的订阅就像调用一个函数,提供了可以传递数据的回调。
这和 addEventListener
/ removeEventListener
等事件处理程序 API 完全不同。使用 observable.subscribe
,给定的 Observer
不会在 Observable
中注册为监听器。Observable
甚至不维护一个 Observer
列表。
订阅调用只是一种启动 Observable
执行并将值或时间传递给该执行的 Observer
的方法。
# Executing Observables
new Observable(function subscribe(subscriber) {})
里面的代码表示 Observable
的执行,只发生在每个订阅的 Observer
上的惰性计算。执行会随着时间的推移,同步或异步地产生多个值。
Observable
执行可以传递的值类型:
Next
通知:发送一个值,如Number
、String
、Object
等Error
通知:发送一个错误,如Error
Complete
通知:不发送值
Next
通知时最重要也是最常见的类型:它表示发送给订阅者的实际数据。Error
和 Complete
通知在 Observable
执行过程中只可能执行一次,并且只能有一个发生。
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observable
严格遵守协议,在 Complete
通知之后的 Next
通知将不会被发送:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered to subscribers
});
可以在 subscribe
代码外包一层 try/catch
块,以捕获错误:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err);
}
});
# Disposing Observables Executions
因为 Observable
执行可能是无限的,但是对于 Observer
来说在有限时间内结束执行时常见的需求,因此需要有取消执行的 API。因为每次执行只针对一个 Observer
,一旦 Observer
接收到数据,它需要有方法去停止执行,不然会造成计算资源和内存的浪费。
当 observable.subscribe
被调用时,Observer
被附加到新创建的 Observable
执行中,该调用还会返回 Subscription
对象。
const subscription = observable.subscribe(x => console.log(x));
Subscription
(opens new window) 表示正在进行的执行,它具有允许你取消该执行的最小 API。
import { from } from 'rxjs';
const observable = from([1, 2, 3]);
const subscription = observable.subscribe(x => console.log(x));
// Later
subscription.unsubscribe();
当我们使用 create()
创建 Observable
时,每个 Observable
都必须定义如何处理该执行的资源,如可以在函数 subscribe()
中返回自定义取消订阅函数来实现。
const observable = new Observable(function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
});
就像 observable.subscribe
类似于 new Observable(function subscribe (subscriber) {})
, 我们从 subscribe
返回的 unsubscribe
在概念上等同于 subscription.unsubscribe
。如果移除围绕在这些概念周围的 ReactiveX 类型,留下的就是原生的 JavaScript。
function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('completed')
});
// Later
unsubscribe();
之所以使用像 Observable
、Observer
和 Subscription
的 Rx 类型,是为了安全考虑和 Operator
的可组合性。
# Observer
什么是 Observer
? Observer
作为消费者消费 Observable
派发的值。Observer
只是一组回调,用于 Observable
派发的每种类型的通知:next
, error
和 complete
。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`),
complete: () => console.log('Observer got a complete notification')
};
// 通过将 observer 对象传递给 `subscribe`,来订阅 observable
observable.subscribe(observer);
Observer
只是有三个回调的对象,用于Observable
可能派发每种类型的通知。
RxJS 中的 Observer
也可能是部分的。如果没有提供某种回调,Observable
也会正常执行,只不过一些类型的通知会被忽略,因为他们在 Observer
中找不到对应的回调。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`)
};
在订阅 Observable
时,也可以不用将回调放在一个 Observer
对象中,只传一个 next
回调函数作为参数就可以。
observable.subscribe(value => console.log(`Observer got a next value: ${value}`));
在 observable.subscribe
内部,将使用参数中的回调函数作为下一个处理程序创建一个 Observer
对象。