RxJava2.0 操作符之 -- 连接操作符

本贴最后更新于 2948 天前,其中的信息可能已经沧海桑田

Connect

让一个可连接的 Observable 开始发射数据给订阅者

publish

将普通的 Observable 转换为可连接的 Observable

Observable observable=Observable.just(1,2,3); ConnectableObservable connectableObservable = observable.publish(); connectableObservable.subscribe(RxUtils.getObserver()); connectableObservable.connect(); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }
onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

refCount

让一个可连接的 Observable 行为像普通的 Observable

Observable observable=Observable.just(1,2,3); ConnectableObservable connectableObservable = observable.publish(); Observable observable1= connectableObservable.refCount(); observable1.subscribe(RxUtils.getObserver()); // connectableObservable.connect(); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }
onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

Replay

保证所有的观察者收到相同的数据序列,即使它们在 Observable 开始发射数据之后才订阅

2.x 代码中没有达到预期,1.x却有效 不知道为什么,又在2.x成功的大神,给说下,谢谢

Observable observable=Observable.interval(1, TimeUnit.SECONDS).take(5); ConnectableObservable connectableObservable = observable.publish(); connectableObservable .replay(4) //缓存4个数据 .publish(); connectableObservable.connect(); connectableObservable .delaySubscription(4,TimeUnit.SECONDS) .subscribe(RxUtils .getObserver()); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3203 引用 • 8217 回帖 • 1 关注
  • 线程
    123 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...