Redux-observable
是一个基于rxjs的Redux中间件,允许开发者使用异步操作。它是redux-thunk
和redux-saga
的替代品。
本文介绍了RxJS的基础知识,如何上手 redux-observable
,以及一些实际的用例。但在此之前,我们需要理解观察者(Observer
)模式。
Observer 观察者模式
在观察者模式中,一个名为“可观察对象(Observable
)”或“Subject
”的对象维护着一个名为“观察者(Observers
)”的订阅者集合。当Subjects
的状态发生变化时,它会通知所有的观察者。
在JavaScript中,最简单的例子是事件发射器(event emitters
)和事件处理程序(event handlers
)。
当您执行.addeventlistener
时,你正在将一个观察者推入subject
的观察者集合中。无论何时事件发生,subject
都会通知所有观察者。
RxJS
根据官方网站,RxJS
是ReactiveX
的JavaScript
实现,ReactiveX
是一个库,通过使用可观察序列来编写异步和基于事件的程序。
简单来说,RxJS
是观察者模式的一个实现。它还扩展了Observer
模式,提供了允许我们以声明方式组合observable
和Subjects
的操作符。
观察者(Observers
)、可观察对象(Observables
)、操作符(Operators
)和Subjects
是RxJS
的构建块。现在让我们更详细地看看每一个。
Observers
观察者(Observers
)是可以订阅observable
和Subjects
的对象。订阅之后,他们可以收到三种类型的通知: next、error和complete。
interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
当Observable
推送next
、error
和complete
通知时,观察者的.next
、.error
和.complete
方法就会被调用。
Observables
可观察对象是可以在一段时间内发出数据的对象。它可以用“大理石图”来表示。
其中,水平线表示时间,圆形节点表示Observable
发出的数据,垂直线表示Observable
已经成功完成。
Observables
对象可能会遇到错误。X(叉)
表示由Observable
发出的错误。
“completed
”和“error
”状态是最终状态。这意味着,observable
在成功完成或遇到错误后不能发出任何数据。
创建一个 Observable
可观察对象(Observables
)是通过新的Observable
构造函数创建的,该构造函数只有一个参数——订阅函数。可观察对象Observables
也可以使用一些操作符来创建,但我们稍后会在讨论操作符的时候讨论这个。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
// Subscribe function
});
订阅一个 Observable
可观察对象(Observables
)可以通过其.subscribe
方法和传递一个Observer
来订阅。
observable.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log('completed');
});
执行 Observable
当Observable
被订阅时,我们传递给新Observable
构造函数的subscribe
函数就会被执行。
订阅函数接受一个参数—Subscriber
。Subscriber
的结构类似于观察者,它有相同的3个方法:.next、.error和.complete
。
observable
可以使用.next
方法将数据推送到Observer
。如果Observable
成功完成了,它可以使用.complete
方法通知观察者。如果Observable
遇到了错误,它可以使用.error
方法将错误推送给观察者。
// Create an Observable
const observable = new Observable(subscriber => {
subscriber.next('first data');
subscriber.next('second data');
setTimeout(() => {
subscriber.next('after 1 second - last data');
subscriber.complete();
subscriber.next('data after completion'); // <-- ignored
}, 1000);
subscriber.next('third data');
});
// Subscribe to the Observable
observable.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log('completed')
});
// Outputs:
//
// first data
// second data
// third data
// after 1 second - last data
// completed
Observables 是单播的
可观察对象Observables
是单播的,这意味着可观察对象最多可以有一个订阅方。当一个观察者订阅了一个可观察对象,它会得到一个有自己执行路径的可观察对象的副本,使可观察对象成为单播的。
这就像在看YouTube视频。所有的观众观看相同的视频内容,但他们可以观看视频的不同部分。
例如:让我们创建一个在10秒内发出1到10的Observable
。然后,立即订阅一次Observable
, 5秒后再订阅一次。
// Create an Observable that emits data every second for 10 seconds
const observable = new Observable(subscriber => {
let count = 1;
const interval = setInterval(() => {
subscriber.next(count );
if (count > 10) {
clearInterval(interval);
}
}, 1000);
});
// Subscribe to the Observable
observable.subscribe({
next: value => {
console.log(`Observer 1: ${value}`);
}
});
// After 5 seconds subscribe again
setTimeout(() => {
observable.subscribe({
next: value => {
console.log(`Observer 2: ${value}`);
}
});
}, 5000);
/* Output
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 1: 6
Observer 2: 2
Observer 1: 7
Observer 2: 3
Observer 1: 8
Observer 2: 4
Observer 1: 9
Observer 2: 5
Observer 1: 10
Observer 2: 6
Observer 2: 7
Observer 2: 8
Observer 2: 9
Observer 2: 10
*/
在输出中,可以注意到第二个Observer
从1开始打印,尽管它在5秒后订阅了。这是因为第二个观察者收到了一个可观察对象的副本,它的订阅函数被再次调用了。这说明了可观察对象的单播行为。
Subjects
Subject
是可观察对象的一种特殊类型。
创建一个 Subject
使用new Subject()
构造函数创建Subject
:
import { Subject } from 'rxjs';
// Create a subject
const subject = new Subject();
订阅一个 Subject
订阅Subject
类似于订阅Observable
:你使用.subscribe
方法并传递一个Observer
:
subject.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log("done")
});
执行一个 Subject
与observable
不同的是,Subject
调用自己的.next、.error和.complete
方法来将数据推送给观察者。
// Push data to all observers
subject.next('first data');
// Push error to all observers
subject.error('oops something went wrong');
// Complete
subject.complete('done');
Subjects 是多播的
Subjects
是多播的:多个观察者共享相同的Subject
及其执行路径。这意味着所有通知都会广播给所有观察者。这就像看现场直播节目
。所有观众都在同一时间观看相同内容的同一片段。
示例:让我们创建一个Subject
,在10秒内触发1到10。然后,立即订阅一次Observable
, 5秒后再订阅一次。
// Create a subject
const subject = new Subject();
let count = 1;
const interval = setInterval(() => {
subscriber.next(count );
if (count > 10) {
clearInterval(interval);
}
}, 1000);
// Subscribe to the subjects
subject.subscribe(data => {
console.log(`Observer 1: ${data}`);
});
// After 5 seconds subscribe again
setTimeout(() => {
subject.subscribe(data => {
console.log(`Observer 2: ${data}`);
});
}, 5000);
/* OUTPUT
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9
Observer 1: 10
Observer 2: 10
*/
在输出中,可以注意到第二个Observer
从5
开始打印,而不是从1
开始。这是因为第二个观察者共享同一个Subject
。由于Subject
在5秒后订阅,所以它已经完成了1
到4
的发送。这说明了Subject
的多播行为。
Subjects 是 Observable 也是 Observer
Subjects
有.next、.error和.complete
方法。这意味着他们遵循观察者的结构。因此,一个Subject
也可以被用作一个观察者,并传递给observable
或其他Subject
的.subscribe
函数。
例如:让我们创建一个可观察对象Observable
和一个Subject
。然后使用Subject
作为观察者订阅Observable
。最后,订阅Subject
。Observable
发出的所有值都将被推送到Subject
,而Subject
将把接收到的值广播给所有的observer
。
// Create an Observable that emits data every second
const observable = new Observable(subscriber => {
let count = 1;
const interval = setInterval(() => {
subscriber.next(count );
if (count > 5) {
clearInterval(interval);
}
}, 1000);
});
// Create a subject
const subject = new Subject();
// Use the Subject as Observer and subscribe to the Observable
observable.subscribe(subject);
// Subscribe to the subject
subject.subscribe({
next: value => console.log(value)
});
/* Output
1
2
3
4
5
*/
操作符
操作符使RxJS
变得有用。操作符是返回一个新的可观察对象的纯函数。可分为两大类:
- 创建操作符
- Pipeable操作符
创建操作符
创建操作符是可以创建一个新的Observable
的函数。
例如:我们可以创建一个Observable
,它使用from
操作符来触发数组中的每个元素。
const observable = from([2, 30, 5, 22, 60, 1]);
observable.subscribe({
next: (value) => console.log("Received", value),
error: (err) => console.log(err),
complete: () => console.log("done")
});
/* OUTPUTS
Received 2
Received 30
Received 5
Received 22
Received 60
Received 1
done
*/
使用大理石图的Observable
也是一样的。
Pipeable 操作符
可管道操作符(pipe-able operator
)是将Observable
作为输入,并返回一个行为经过修改的新的Observable
函数。
例子:让我们以from
操作符创建的Observable
为例。现在使用这个Observable
,我们可以创建一个新的Observable
,使用filter
操作符只发出大于10
的数字。
const greaterThanTen = observable.pipe(filter(x => x > 10));
greaterThanTen.subscribe(console.log, console.log, () => console.log("completed"));
// OUTPUT
// 11
// 12
// 13
// 14
// 15
同样的情况也可以用大理石图表示。
还有很多更有用的操作符。你可以在RxJS
官方文档中看到完整的操作符列表和示例。
了解所有常用的操作符是至关重要的。下面是我经常使用的一些操作符:
代码语言:javascript复制1. mergeMap
2. switchMap
3. exhaustMap
4. map
5. catchError
6. startWith
7. delay
8. debounce
9. throttle
10.interval
11.from
12.of
Redux Observables
根据官方网站,Redux Observables
是 Redux
基于rxjs
的中间件。它能组合和取消异步操作,以创建副作用和更多功能。
在Redux
中,无论何时dispatch
一个action
,它都会运行所有的reducer
函数,并返回一个新的状态state
。
Redux-observable获取所有这些已dispatch的action和新state,并从中创建两个可观察对象- actions可观察对象action和states可观察对象state。
Actions可观察对象action将发出所有使用store.dispatch()分派的actions。可观察状态state将触发根reducer返回的所有新状态对象。
Epics
还有很多更有用的操作符。你可以在RxJS
官方文档中看到完整的操作符列表和示例。
了解所有常用的操作符是至关重要的。下面是我经常使用的一些操作符:
代码语言:javascript复制1. mergeMap
2. switchMap
3. exhaustMap
4. map
5. catchError
6. startWith
7. delay
8. debounce
9. throttle
10.interval
11.from
12.of
Redux Observables
根据官方网站,Redux Observables
是 Redux
基于rxjs
的中间件。它能组合和取消异步操作,以创建副作用和更多功能。
在Redux
中,无论何时dispatch
一个action
,它都会运行所有的reducer
函数,并返回一个新的状态state
。
Redux-observable获取所有这些已dispatch的action和新state,并从中创建两个可观察对象- actions可观察对象action和states可观察对象state。
Actions可观察对象action将发出所有使用store.dispatch()分派的actions。可观察状态state将触发根reducer返回的所有新状态对象。
Epics
根据官方网站,Epics
是一个接受actions
流并返回actions
流的函数。actions
进,actions
出。
epic
是可以用来订阅action
和状态观察对象的函数。一旦订阅,epic
将接收action
流和状态流作为输入,并且必须返回action
流作为输出。
const someEpic = (action$, state$) => {
return action$.pipe( // subscribe to actions observable
map(action => { // Receive every action, Actions In
return someOtherAction(); // return an action, Actions Out
})
)
}
重要的是要明白在Epic
中接收到的所有action
都已经通过reducers
完成了。
在Epic
内部,我们可以使用任何RxJS
的可观察模式,这就是为什么redux-observable
很有用。
例如:我们可以使用.filter
操作符创建一个新的中间可观察对象。类似地,我们可以创建任意数量的中间可观察对象,但最终可观察对象的最终输出必须是一个action
,否则redux-observable
将引发异常。
const sampleEpic = (action$, state$) => {
return action$.pipe(
filter(action => action.payload.age >= 18), // can create intermediate observables and streams
map(value => above18(value)) // where above18 is an action creator
);
}
使用store.dispatch()
立即分派由Epics
发出的每个action
。
即可上手
首先,让我们安装依赖项:
代码语言:javascript复制npm install --save rxjs redux-observable
创建一个名为epics
的单独文件夹来保存所有的epics
。在epics
文件夹中创建一个新的文件index.js
,并使用combineEpics
函数合并所有的epics
来创建根epic
。然后导出根epic
。
import { combineEpics } from 'redux-observable';
import { epic1 } from './epic1';
import { epic2 } from './epic2';
const epic1 = (action$, state$) => {
...
}
const epic2 = (action$, state$) => {
...
}
export default combineEpics(epic1, epic2);
使用createEpicMiddleware
函数创建一个eoic
中间件,并将其传递给createStore Redux
函数。
import { createEpicMiddleware } from 'redux-observable';
import { createStore, applyMiddleware } from 'redux';
import rootEpic from './rootEpics';
const epicMiddleware = createEpicMiddlware();
const store = createStore(
rootReducer,
applyMiddleware(epicMiddlware)
);
最后,将epic
根目录传递给epic
中间件的.run
方法:
epicMiddleware.run(rootEpic);
练习
RxJS
有一个很大的学习曲线,并且Redux-observable
的设置使已经很痛苦的Redux
设置过程更加糟糕。所有这些都让Redux
看起来有点过头了。但是这里有一些实际的用例可以改变您的想法。
在本节中,我将比较redux-observable
和redux-thunk
,以展示redux-observable
如何在复杂的用例中发挥作用。但我不讨厌redux- tank
,我喜欢它,我每天都在使用它!
练习1:调用API
用例:调用API
来获取文章的注释。当API
调用正在进行时显示加载器,并处理API
错误。
redux-thunk
的实现是这样的:
function getComments(postId){
return (dispatch) => {
dispatch(getCommentsInProgress());
axios.get(`/v1/api/posts/${postId}/comments`).then(response => {
dispatch(getCommentsSuccess(response.data.comments));
}).catch(() => {
dispatch(getCommentsFailed());
});
}
}
这是绝对正确的。但是action creator
是臃肿的。
我们可以写一个Epic
来实现相同的redux-observable
:
const getCommentsEpic = (action$, state$) => action$.pipe(
ofType('GET_COMMENTS'),
mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`)
.pipe(
map(response => getCommentsSuccess(response.data.comments)),
catchError(() => getCommentsFailed()),
startWith(getCommentsInProgress())
)
);
它可以让我们有一个像这样干净简单的action creator
:
function getComments(postId) {
return {
type: 'GET_COMMENTS',
payload: {
postId
}
}
}
练习2:请求防抖
用例:每当文本字段的值发生变化时,通过调用API
为文本字段提供自动补全。API
调用应该在用户停止输入1
秒后进行。
redux-thunk
的实现是这样的:
let timeout;
function valueChanged(value) {
return dispatch => {
dispatch(loadSuggestionsInProgress());
dispatch({
type: 'VALUE_CHANGED',
payload: {
value
}
});
// If changed again within 1 second, cancel the timeout
timeout && clearTimeout(timeout);
// Make API Call after 1 second
timeout = setTimeout(() => {
axios.get(`/suggestions?q=${value}`)
.then(response =>
dispatch(loadSuggestionsSuccess(response.data.suggestions)))
.catch(() => dispatch(loadSuggestionsFailed()))
}, 1000, value);
}
}
它需要一个全局变量timeout
。当我们开始使用全局变量时,我们的action creator
就不再是纯函数了。对使用全局变量的action creator
进行单元测试也变得很困难。
我们可以使用.debounce
操作符在redux-observable
中实现同样的功能:
const loadSuggestionsEpic = (action$, state$) => action$.pipe(
ofType('VALUE_CHANGED'),
debounce(1000),
mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(
map(response => loadSuggestionsSuccess(response.data.suggestions)),
catchError(() => loadSuggestionsFailed())
)),
startWith(loadSuggestionsInProgress())
);
现在,我们的action creator
可以被清理,更重要的是,它们可以再次成为纯函数。
function valueChanged(value) {
return {
type: 'VALUE_CHANGED',
payload: {
value
}
}
}
练习3:请求撤销
用例:继续前面的用例,假设用户在1
秒钟内没有输入任何东西,并且我们进行了第一次API
调用来获取建议。
假设API
本身平均需要2-3
秒才能返回结果。现在,如果用户在第一个API
调用进行时输入了一些东西,1
秒后,我们将创建第二个API
。我们可以同时有两个API
调用,它可以创建一个竞争条件。
为了避免这种情况,我们需要在进行第二个API
调用之前取消第一个API
调用。
redux-thunk
的实现是这样的:
let timeout;
var cancelToken = axios.cancelToken;
let apiCall;
function valueChanged(value) {
return dispatch => {
dispatch(loadSuggestionsInProgress());
dispatch({
type: 'VALUE_CHANGED',
payload: {
value
}
});
// If changed again within 1 second, cancel the timeout
timeout && clearTimeout(timeout);
// Make API Call after 1 second
timeout = setTimeout(() => {
// Cancel the existing API
apiCall && apiCall.cancel('Operation cancelled');
// Generate a new token
apiCall = cancelToken.source();
axios.get(`/suggestions?q=${value}`, {
cancelToken: apiCall.token
})
.then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions)))
.catch(() => dispatch(loadSuggestionsFailed()))
}, 1000, value);
}
}
现在,它需要另一个全局变量来存储Axios
的取消令牌。更多的全局变量=更多的非纯函数!
要使用redux-observable
实现相同的功能,我们只需要将.mergemap
替换为.switchmap
:
const loadSuggestionsEpic = (action$, state$) => action$.pipe(
ofType('VALUE_CHANGED'),
throttle(1000),
switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(
map(response => loadSuggestionsSuccess(response.data.suggestions)),
catchError(() => loadSuggestionsFailed())
)),
startWith(loadSuggestionsInProgress())
);
因为它不需要对我们的action creator
进行任何更改,所以它们可以继续是纯函数。
类似地,在许多用例中,redux-observable
确实很出色!例如,查询API
,管理WebSocket
连接,等等。
总结
如果你正在开发一个包含如此复杂的用例的Redux
应用程序,强烈推荐使用Redux-observables
。毕竟,使用它的好处直接与应用程序的复杂性成正比,这从上面提到的实际用例中是显而易见的。
我坚信使用正确的库集将帮助我们开发更干净和可维护的应用程序,并且从长远来看,使用它们的好处将超过缺点。