转载请以链接形式标明出处: 本文出自:103style的博客
连接相关的操作符 以及 官方介绍
RxJava
之 连接操作符 官方介绍 :Connectable Observable Operators
ConnectableObservable.connect( )
instructs a Connectable Observable to begin emitting items 指示Connectable Observable
开始发出项目Observable.publish( )
represents an Observable as a Connectable Observable 将Observable
表示为可连接的Observable
Observable.replay( )
ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items 确保所有订阅者都看到相同的发射项目序列,即使他们在Observable
开始发布项目后订阅ConnectableObservable.refCount( )
makes a Connectable Observable behave like an ordinary Observable 使Connectable Observable
的行为类似于普通的Observable
示例:
非连接操作
代码语言:javascript复制ConnectableObservable firstMillion = Observable.range(1, 1000000)
.sample(7, TimeUnit.MILLISECONDS)
.publish();
firstMillion.subscribe(new Consumer() {
@Override
public void accept(Object it) throws Exception {
System.out.println("Subscriber #1:" it);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable it) throws Exception {
System.out.println("Error: " it.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Sequence #1 complete");
}
});
firstMillion.subscribe(new Consumer() {
@Override
public void accept(Object it) throws Exception {
System.out.println("Subscriber #2:" it);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable it) throws Exception {
System.out.println("Error: " it.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Sequence #2 complete");
}
});
输出:
代码语言:javascript复制Subscriber #1:391999
Sequence #1 complete
Subscriber #2:556663
Sequence #2 complete
publish and connect
官方示例:
代码语言:javascript复制ConnectableObservable firstMillion = Observable.range(1, 1000000)
.sample(7, TimeUnit.MILLISECONDS)
.publish();
firstMillion.subscribe(new Consumer() {
@Override
public void accept(Object it) throws Exception {
System.out.println("Subscriber #1:" it);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable it) throws Exception {
System.out.println("Error: " it.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Sequence #1 complete");
}
});
firstMillion.subscribe(new Consumer() {
@Override
public void accept(Object it) throws Exception {
System.out.println("Subscriber #2:" it);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable it) throws Exception {
System.out.println("Error: " it.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Sequence #2 complete");
}
});
firstMillion.connect();
输出:
代码语言:javascript复制Subscriber #1:984513
Subscriber #2:984513
Sequence #1 complete
Sequence #2 complete
publish and refCount
官方示例:
代码语言:javascript复制ConnectableObservable firstMillion = Observable.range(1, 1000000)
.sample(7, TimeUnit.MILLISECONDS)
.publish();
firstMillion.refCount().subscribe(new Consumer() {
@Override
public void accept(Object it) throws Exception {
System.out.println("Subscriber #1:" it);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable it) throws Exception {
System.out.println("Error: " it.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Sequence #1 complete");
}
});
firstMillion.refCount().subscribe(new Consumer() {
@Override
public void accept(Object it) throws Exception {
System.out.println("Subscriber #2:" it);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable it) throws Exception {
System.out.println("Error: " it.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Sequence #2 complete");
}
});
输出:
代码语言:javascript复制Subscriber #1:438899
Sequence #1 complete
Subscriber #2:684698
Sequence #2 complete
以上