Connect
让一个可连接的 Observable 开始发射数据给订阅者
publish
将普通的 Observable 转换为可连接的 Observable
Observable observable=Observable.just(1,2,3);
ConnectableObservable connectableObservable = observable.publish();
connectableObservable.subscribe(RxUtils.getObserver());
connectableObservable.connect();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe
Thread:Thread[main,5,main]
onNext:1
Thread:Thread[main,5,main]
onNext:2
Thread:Thread[main,5,main]
onNext:3
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
refCount
让一个可连接的 Observable 行为像普通的 Observable
Observable observable=Observable.just(1,2,3);
ConnectableObservable connectableObservable = observable.publish();
Observable observable1= connectableObservable.refCount();
observable1.subscribe(RxUtils.getObserver());
// connectableObservable.connect();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe
Thread:Thread[main,5,main]
onNext:1
Thread:Thread[main,5,main]
onNext:2
Thread:Thread[main,5,main]
onNext:3
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
Replay
保证所有的观察者收到相同的数据序列,即使它们在 Observable 开始发射数据之后才订阅
2.x 代码中没有达到预期,1.x却有效 不知道为什么,又在2.x成功的大神,给说下,谢谢
Observable observable=Observable.interval(1, TimeUnit.SECONDS).take(5);
ConnectableObservable connectableObservable = observable.publish();
connectableObservable
.replay(4) //缓存4个数据
.publish();
connectableObservable.connect();
connectableObservable
.delaySubscription(4,TimeUnit.SECONDS)
.subscribe(RxUtils
.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于