RxJS -- Subscription

2018-03-29 12:34:11 浏览数 (1)

Subscription是什么?

当subscribe一个observable的时候, 返回的就是一个subscription. 它是一个一次性对象(disposable), 它有一个非常重要的方法 ubsubscribe(), 它没有参数, 它会dispose掉subscription所持有的资源, 或者叫取消observable的执行.

第一个例子:

代码语言:javascript复制
import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";
import 'rxjs/add/observable/interval';

const observable = Observable.interval(1000);

const subscription = observable.subscribe(x => console.log(x));

console.log(subscription);

subscription.unsubscribe();

console.log(subscription);

运行结果是这样的:

代码语言:javascript复制
Subscriber {
  closed: false,
  _parent: null,
  _parents: null,
  _subscriptions: 
   [ AsyncAction {
       closed: false,
       _parent: [Circular],
       _parents: null,
       _subscriptions: null,
       scheduler: [AsyncScheduler],
       work: [Function],
       pending: true,
       state: [Object],
       delay: 1000,
       id: [Timeout] } ],
  syncErrorValue: null,
  syncErrorThrown: false,
  syncErrorThrowable: false,
  isStopped: false,
  destination: 
   SafeSubscriber {
     closed: false,
     _parent: null,
     _parents: null,
     _subscriptions: null,
     syncErrorValue: null,
     syncErrorThrown: false,
     syncErrorThrowable: false,
     isStopped: false,
     destination: 
      { closed: true,
        next: [Function: next],
        error: [Function: error],
        complete: [Function: complete] },
     _parentSubscriber: [Circular],
     _context: [Circular],
     _next: [Function],
     _error: undefined,
     _complete: undefined } }
Subscriber {
  closed: true,
  _parent: null,
  _parents: null,
  _subscriptions: null,
  syncErrorValue: null,
  syncErrorThrown: false,
  syncErrorThrowable: false,
  isStopped: true,
  destination: 
   SafeSubscriber {
     closed: false,
     _parent: null,
     _parents: null,
     _subscriptions: null,
     syncErrorValue: null,
     syncErrorThrown: false,
     syncErrorThrowable: false,
     isStopped: false,
     destination: 
      { closed: true,
        next: [Function: next],
        error: [Function: error],
        complete: [Function: complete] },
     _parentSubscriber: [Circular],
     _context: [Circular],
     _next: [Function],
     _error: undefined,
     _complete: undefined } }

注意两次控制台输出的closed属性的值是不同的, true表示已经unsubscribe()了.

在ubsubscribe之后, _subscriptions属性也变成空了, 之前它是一个数组, 说明subscription可以是多个subscriptions的组合.

毁灭函数

如果使用Observable.create方法的话, 它的参数函数可以返回一个function. 而subscription在unsubscribe这个observable的时候, 会调用这个参数函数返回的function, 看例子:

代码语言:javascript复制
import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";
import 'rxjs/add/observable/interval';

const observable = Observable.create(observer => {

    let index = 1;
    setInterval(() => {
        observer.next(index  );
    }, 200);

    return () => {
        // 在这可以做清理工作
        console.log('我在Observable.create返回的function里面...');
    };
});

const subscription = observable.subscribe(
    x => console.log(x),
    err => console.error(err),
    () => console.log(`complete..`)
);

setTimeout(() => {
    subscription.unsubscribe();
}, 1100);

运行结果:

这个例子很好的解释了我写的那一堆拗口的解释..

retry, retryWhen的原理

直接举例:

代码语言:javascript复制
import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/retry';

const observable = Observable.create(observer => {

    setInterval(() => {
        observer.next('doing...');
        observer.error('error!!!');
    }, 200);

    return () => {
        // 在这可以做清理工作
        console.log('我在Observable.create返回的function里面...');
    };
}).retry(4);

observable.subscribe(
    x => console.log(x),
    err => console.error(err),
    () => console.log(`complete..`)
);

可以看到, 每次执行next之后都会有错误, 重试4次.

运行结果:

可以看到, retry/retryWhen其实的原理即是先unsubscribe然后再重新subscribe而已, 所以每次retry都会运行我所称的毁灭函数.

操作多个Subscriptions

多个subscriptions可以一起操作, 一个subscription可以同时unsubscribe多个subscriptions, 使用add方法为subscription添加另一个subscription. 对应的还有一个remove方法.

直接举官网的例子:

代码语言:javascript复制
var observable1 = Observable.interval(400);
var observable2 = Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: '   x));
var childSubscription = observable2.subscribe(x => console.log('second: '   x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

运行结果:

0 人点赞