介绍RxJS前,先介绍Observable
可观察对象(Observable)
可观察对象支持在应用中的发布者和订阅者之间传递消息。
可观察对象可以发送多个任意类型的值 —— 字面量、消息、事件。
基本用法和词汇
作为发布者,你创建一个 Observable 的实例,其中定义了一个订阅者(subscriber)函数。 当有消费者调用 subscribe() 方法时,这个函数就会执行。 订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。
要执行所创建的可观察对象,并开始从中接收通知,你就要调用它的 subscribe() 方法,并传入一个观察者(observer)。 这是一个 JavaScript 对象,它定义了你收到的这些消息的处理器(handler)。 subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。 当调用该方法时,你就会停止接收通知。
代码语言:javascript复制const locations = new Observable((observer) => {
// Get the next and error callbacks. These will be passed in when
// the consumer subscribes.
const {next, error} = observer;
let watchId;
// Simple geolocation API check provides values to publish
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition(next, error);
} else {
error('Geolocation not available');
}
// When the consumer unsubscribes, clean up data ready for next subscription.
return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});
// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
next(position) { console.log('Current Position: ', position); },
error(msg) { console.log('Error Getting Location: ', msg); }
});
// Stop listening for location after 10 seconds
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
定义观察者
用于接收可观察对象通知的处理器要实现 Observer 接口。这个对象定义了一些回调函数来处理可观察对象可能会发来的三种通知
通知类型 | 说明 |
---|---|
next | 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。 |
error | 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。 |
complete | 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。 |
订阅
只有当有人订阅 Observable 的实例时,它才会开始发布值。
代码语言:javascript复制const myObservable = Observable.of(1, 2, 3);
// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' x),
error: err => console.error('Observer got an error: ' err),
complete: () => console.log('Observer got a complete notification'),
};
// Execute with the observer object
myObservable.subscribe(myObserver);
subscribe() 方法还可以接收定义在同一行中的回调函数,无论 next、error 还是 complete 处理器,下面的代码和刚才的等价:
代码语言:javascript复制myObservable.subscribe(
x => console.log('Observer got a next value: ' x),
err => console.error('Observer got an error: ' err),
() => console.log('Observer got a complete notification')
);
创建可观察对象
使用 Observable 构造函数可以创建任何类型的可观察流。
下面是一个例子:
代码语言:javascript复制function fromEvent(target, eventName) {
return new Observable((observer) => {
// 事件处理函数,每次执行eventName,观察者observer就next一条数据
const handler = (e) => observer.next(e);
// 添加事件绑定
target.addEventListener(eventName, handler);
return () => {
// 退订
target.removeEventListener(eventName, handler);
};
});
}
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown')
.subscribe((e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
});
多播
多播用来让可观察对象在一次执行中同时广播给多个订阅者。借助支持多播的可观察对象,你不必注册多个监听器,而是复用第一个(next)监听器,并且把值发送给各个订阅者。
多播的核心是,将observers放到一个数组,然后遍历
代码语言:javascript复制function multicastSequenceSubscriber() {
const seq = [1, 2, 3];
// Keep track of each observer (one for every active subscription)
const observers = [];
// Still a single timeoutId because there will only ever be one
// set of values being generated, multicasted to each subscriber
let timeoutId;
// Return the subscriber function (runs when subscribe()
// function is invoked)
return (observer) => {
observers.push(observer);
// When this is the first subscription, start the sequence
if (observers.length === 1) {
timeoutId = doSequence({
next(val) {
// Iterate through observers and notify all subscriptions
observers.forEach(obs => obs.next(val));
},
complete() {
// Notify all complete callbacks
observers.forEach(obs => obs.complete());
}
}, seq, 0);
}
return {
unsubscribe() {
// Remove from the observers array so it's no longer notified
observers.splice(observers.indexOf(observer), 1);
// If there's no more listeners, do cleanup
if (observers.length === 0) {
clearTimeout(timeoutId);
}
}
};
};
}
// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer, arr, idx) {
return setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(observer, arr, idx );
}
}, 1000);
}
// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber);
// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
next(num) { console.log('1st subscribe: ' num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
multicastSequence.subscribe({
next(num) { console.log('2nd subscribe: ' num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 1500);
// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished
RxJS 库
RxJS(响应式扩展的 JavaScript 版)是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单,RxJS 提供了一种对 Observable 类型的实现.。
这些工具函数可用于:
- 把现有的异步代码转换成可观察对象
- 迭代流中的各个值
- 把这些值映射成其它类型
- 对流进行过滤
- 组合多个流
创建可观察对象的函数
RxJS 提供了一些用来创建可观察对象的函数。这些函数可以简化根据某些东西创建可观察对象的过程,比如事件、定时器、promises等等。比如:
从promise创建一个Observable:
代码语言:javascript复制 import { fromPromise } from 'rxjs';
// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
next(response) { console.log(response); },
error(err) { console.error('Error: ' err); },
complete() { console.log('Completed'); }
});
从一个事件创建一个observable:
代码语言:javascript复制import { fromEvent } from 'rxjs';
const el = document.getElementById('my-element');
// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');
// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
// Log coords of mouse movements
console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
// When the mouse is over the upper-left of the screen,
// unsubscribe to stop listening for mouse movements
if (evt.clientX < 40 && evt.clientY < 40) {
subscription.unsubscribe();
}
});
从ajax创建一个observable:
代码语言:javascript复制import { ajax } from 'rxjs/ajax';
// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
操作符
操作符是基于可观察对象构建的一些对集合进行复杂操作的函数.,常见的有 map()、filter()、concat() 和 flatMap()
代码语言:javascript复制import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
// Logs
// 1
// 4
// 9
常用操作符
类别 | 操作 |
---|---|
创建 | from , fromPromise , fromEvent , of |
组合 | combineLatest , concat , merge , startWith , withLatestFrom , zip |
过滤 | debounceTime , distinctUntilChanged , filter , take , takeUntil |
转换 | bufferTime , concatMap , map , mergeMap , scan , switchMap |
工具 | tap |
多播 | share |
错误处理
除了可以在订阅时提供 error() 处理器外,RxJS 还提供了 catchError 操作符,它允许你在管道中处理已知错误。
代码语言:javascript复制import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
重试失败的可观察对象
可以使用retry重试失败的操作
代码语言:javascript复制import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
retry(3), // Retry up to 3 times before failing
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
可观察对象的命名约定
约定俗成的,可观察对象的名字以“$”符号结尾。 同样的,如果你希望用某个属性来存储来自可观察对象的最近一个值,它的命名惯例是与可观察对象同名,但不带“$”后缀。
代码语言:javascript复制import { Component } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-stopwatch',
templateUrl: './stopwatch.component.html'
})
export class StopwatchComponent {
// 最近一次值
stopwatchValue: number;
// 可观察对象
stopwatchValue$: Observable<number>;
start() {
this.stopwatchValue$.subscribe(num =>
this.stopwatchValue = num
);
}
}
Angular中的observables
Angular 中大量使用了可观察对象,作为处理各种常用异步操作的接口。 比如:
EventEmitter
类派生自Observable
。- HTTP 模块使用可观察对象来处理 AJAX 请求和响应
- 路由器和表单模块使用可观察对象来监听对用户输入事件的响应
事件发送器 EventEmitter
Angular 提供了一个 EventEmitter 类,它用来从组件的 @Output() 属性中发布一些值。EventEmitter 扩展了 Observable,并添加了一个 emit() 方法,这样它就可以发送任意值了。当你调用 emit() 时,就会把所发送的值传给订阅上来的观察者的 next() 方法
代码语言:javascript复制@Component({
selector: 'zippy',
template: `
<div class="zippy">
<div (click)="toggle()">Toggle</div>
<div [hidden]="!visible">
<ng-content></ng-content>
</div>
</div>`})
export class ZippyComponent {
visible = true;
@Output() open = new EventEmitter<any>();
@Output() close = new EventEmitter<any>();
toggle() {
this.visible = !this.visible;
if (this.visible) {
this.open.emit(null);
} else {
this.close.emit(null);
}
}
}
HTTP
Angular 的 HttpClient 从 HTTP 方法调用中返回了可观察对象。例如,http.get(‘/api’) 就会返回可观察对象。
为什么NG使用observable而不是Promise?
- 可观察对象不会修改服务器的响应(和在承诺上串联起来的
.then()
调用一样)。反之,你可以使用一系列操作符来按需转换这些值 - HTTP 请求是可以通过
unsubscribe()
方法来取消的 - 请求可以进行配置,以获取进度事件的变化
- 失败的请求很容易重试
Async 管道
AsyncPipe 会订阅一个可观察对象或承诺,并返回其发出的最后一个值。当发出新值时,该管道就会把这个组件标记为需要进行变更检查的(因此可能导致刷新界面)
代码语言:javascript复制@Component({
selector: 'async-observable-pipe',
template: `<div><code>observable|async</code>:
Time: {{ time | async }}</div>`
})
export class AsyncObservablePipeComponent {
time = new Observable(observer =>
setInterval(() => observer.next(new Date().toString()), 1000)
);
}
路由器 (router)
Router.events 以可观察对象的形式提供了其事件。 你可以使用 RxJS 中的 filter() 操作符来找到感兴趣的事件,并且订阅它们,以便根据浏览过程中产生的事件序列作出决定。
代码语言:javascript复制import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';
@Component({
selector: 'app-routable',
templateUrl: './routable.component.html',
styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {
navStart: Observable<NavigationStart>;
constructor(private router: Router) {
// 通过filter过滤,只关注自己感兴趣的
this.navStart = router.events.pipe(
filter(evt => evt instanceof NavigationStart)
) as Observable<NavigationStart>;
}
ngOnInit() {
this.navStart.subscribe(evt => console.log('Navigation Started!'));
}
}
响应式表单 (reactive forms)
FormControl 的 valueChanges 属性和 statusChanges 属性包含了会发出变更事件的可观察对象
代码语言:javascript复制 import { FormGroup } from '@angular/forms';
@Component({
selector: 'my-component',
template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
nameChangeLog: string[] = [];
heroForm: FormGroup;
ngOnInit() {
this.logNameChange();
}
logNameChange() {
const nameControl = this.heroForm.get('name');
nameControl.valueChanges.forEach(
(value: string) => this.nameChangeLog.push(value)
);
}
}
可观察对象In Action
搜索建议(suggestions)
可观察对象可以简化输入提示建议的实现方式。典型的输入提示要完成一系列独立的任务:
- 从输入中监听数据。
- 移除输入值前后的空白字符,并确认它达到了最小长度。
- 防抖(这样才能防止连续按键时每次按键都发起 API 请求,而应该等到按键出现停顿时才发起)
- 如果输入值没有变化,则不要发起请求(比如按某个字符,然后快速按退格)。
- 如果已发出的 AJAX 请求的结果会因为后续的修改而变得无效,那就取消它。
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
const searchBox = document.getElementById('search-box');
const typeahead = fromEvent(searchBox, 'input').pipe(
map((e: KeyboardEvent) => e.target.value),
filter(text => text.length > 2), // 过滤
debounceTime(10),// 延时
distinctUntilChanged(),//发生变化后再执行
switchMap(() => ajax('/api/endpoint'))
);
typeahead.subscribe(data => {
// Handle the data from the API
});
指数化backoff
指数化退避是一种失败后重试 API 的技巧,它会在每次连续的失败之后让重试时间逐渐变长,超过最大重试次数之后就会彻底放弃。 如果使用承诺和其它跟踪 AJAX 调用的方法会非常复杂,而使用可观察对象,这非常简单:
代码语言:javascript复制import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) {
return pipe(
retryWhen(attempts => range(1, maxTries)
.pipe(
zip(attempts, (i) => i),
map(i => i * i),
mergeMap(i => timer(i * ms))
)
)
);
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
Observables VS. promises
可观察对象经常拿来和承诺进行对比。有一些关键的不同点:
- 可观察对象是声明式的,在被订阅之前,它不会开始执行,promise是在创建时就立即执行的
- 可观察对象能提供多个值,promise只提供一个,这让可观察对象可用于随着时间的推移获取多个值
- 可观察对象会区分串联处理和订阅语句,promise只有
.then()
语句 - 可观察对象的
subscribe()
会负责处理错误,promise会把错误推送给它的子promise
作者:Jadepeng 出处:jqpeng的技术记事本--http://www.cnblogs.com/xiaoqi 您的支持是对博主最大的鼓励,感谢您的认真阅读。 本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。