5 分钟温故知新 RxJS 【转换操作符】

2022-09-19 10:47:38 浏览数 (2)

一起养成写作习惯!这是我参与「掘金日新计划 · 4 月更文挑战」的第2天,点击查看活动详情。


RxJS 转换操作符,继续冲冲冲!熟悉的温故知新,不熟悉的混个脸熟先~

buffer

buffer 顾名思义就是“缓存”,可以在某些条件下进行值的收集,然后再在某些条件下,将收集的值发出。除了 buffer 同类的还有:

  • bufferCount:收集发出的值,直到收集完提供的数量的值才将其作为数组发出。
  • bufferTime:收集发出的值,直到经过了提供的时间才将其作为数组发出。
  • bufferToggle:开启开关以捕获源 observable 所发出的值,关闭开关以将缓冲的值作为数组发出。
  • bufferWhen:收集值,直到关闭选择器发出值才发出缓冲的值

使用方法大同小异,简单理解为:车站安检,人很多的时候,就有专人在那设卡,控制流量,当设卡的人觉得在某个条件下可以了,就放卡,这里的条件可以是:数量、时间、自定义开启、其它条件值;

e.g.

代码语言:javascript复制
// 创建每1秒发出值的 observable
const myInterval = interval(1000);
// 创建页面点击事件的 observable
const bufferBy = fromEvent(document, 'click');

/*
  收集由 myInterval 发出的所有值,直到我们点击页面。此时 bufferBy 会发出值以完成缓存。
  将自上次缓冲以来收集的所有值传递给数组。
*/
const myBufferedInterval = myInterval.pipe(buffer(bufferBy));
// 打印值到控制台

// 例如 输出: [1,2,3] ... [4,5,6,7,8]
const subscribe = myBufferedInterval.subscribe(val =>
  console.log(' Buffered Values:', val)
);

concatMap

concatMap 可以将值进行映射,还有一个与之相似的是 mergeMap,类比来说:一个是 reduce promise,一个是 PromiseAll;

代码语言:javascript复制
// concatMap
// 发出 'Hello' 和 'Goodbye'
const source = of('Hello', 'Goodbye');

// 使用 promise 的示例
const examplePromise = val => new Promise(resolve => resolve(`${val} World!`));

// 将 source 的值映射成内部 observable,当一个完成发出结果后再继续下一个
const example = source.pipe(concatMap(val => examplePromise(val)));

// 输出: 'Example w/ Promise: 'Hello World', Example w/ Promise: 'Goodbye World'
const subscribe = example.subscribe(val =>
  console.log('Example w/ Promise:', val)
);


// mergeMap
// 发出 'Hello'
const source = of('Hello');

// mergeMap 还会发出 promise 的结果
const myPromise = val =>
  new Promise(resolve => resolve(`${val} World From Promise!`));
  
// 映射成 promise 并发出结果
const example = source.pipe(mergeMap(val => myPromise(val)));

// 输出: 'Hello World From Promise'
const subscribe = example.subscribe(val => console.log(val));

map

map 最关键了,它能对源 observable 的每个值应用投射函数。

代码语言:javascript复制
// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 每个数字加10
const example = source.pipe(map(val => val   10));
// 输出: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));

reduce

常见的还有 reduce,它能将源 observalbe 的值归并为单个值,当源 observable 完成时将这个值发出。

代码语言:javascript复制
const source = of(1, 2, 3, 4);
const example = source.pipe(reduce((acc, val) => acc   val));

// 输出: Sum: 10'
const subscribe = example.subscribe(val => console.log('Sum:', val));

window

还有:window 操作符,是时间窗口值的 observable;

代码语言:javascript复制
// RxJS v6 
import { timer, interval } from 'rxjs';
import { window, scan, mergeAll } from 'rxjs/operators';

// 立即发出值,然后每秒发出值
const source = timer(0, 1000);
const example = source.pipe(window(interval(3000)));
const count = example.pipe(scan((acc, curr) => acc   1, 0));
/*
  "Window 1:"
  0
  1
  2
  "Window 2:"
  3
  4
  5
  ...
*/
const subscribe = count.subscribe(val => console.log(`Window ${val}:`));
const subscribeTwo = example
  .pipe(mergeAll())
  .subscribe(val => console.log(val));

其实除了:window,还有其衍生的 windowCount、windowTime、windowToggle、windowWhen。与 buffer 的衍生也很像。


OK,以上便是本篇分享,往期关于 RxJS 的内容:

  • 3 分钟温故知新 RxJS 创建实例操作符
  • 你就是函数响应式编程(FRP)啊?!【附 RxJS 实战】
  • 为什么说:被观察者是 push 数据,迭代者是 pull 数据?
  • 探秘 RxJS Observable 为什么要长成这个样子?!
  • Js 异步处理演进,Callback=>Promise=>Observer
  • 继续解惑,异步处理 —— RxJS Observable

我是掘金安东尼,输出暴露输入,技术洞见生活,再会~

0 人点赞