RxJava之连接操作符介绍

2022-12-19 13:28:35 浏览数 (2)

转载请以链接形式标明出处: 本文出自:103style的博客

连接相关的操作符 以及 官方介绍

RxJava 之 连接操作符 官方介绍 :Connectable Observable Operators

  • ConnectableObservable.connect( ) instructs a Connectable Observable to begin emitting items 指示Connectable Observable开始发出项目
  • Observable.publish( ) represents an Observable as a Connectable Observable 将Observable表示为可连接的Observable
  • Observable.replay( ) ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items 确保所有订阅者都看到相同的发射项目序列,即使他们在Observable开始发布项目后订阅
  • ConnectableObservable.refCount( ) makes a Connectable Observable behave like an ordinary Observable 使Connectable Observable的行为类似于普通的Observable

示例:

非连接操作
代码语言:javascript复制
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:"   it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: "   it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:"   it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: "   it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

代码语言:javascript复制
Subscriber #1:391999
Sequence #1 complete
Subscriber #2:556663
Sequence #2 complete

publish and connect

官方示例:

代码语言:javascript复制
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:"   it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: "   it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:"   it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: "   it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

firstMillion.connect();

输出:

代码语言:javascript复制
Subscriber #1:984513
Subscriber #2:984513
Sequence #1 complete
Sequence #2 complete

publish and refCount

官方示例:

代码语言:javascript复制
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:"   it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: "   it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:"   it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: "   it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

代码语言:javascript复制
Subscriber #1:438899
Sequence #1 complete
Subscriber #2:684698
Sequence #2 complete

以上

0 人点赞