RxJava2.0 操作符之 -- 变换操作符

本贴最后更新于 2836 天前,其中的信息可能已经物是人非

buffer

定期收集 Observable 的数据放进一个数据包裹(不懂为什么翻译为包裹,可能是谷歌翻译吧,我认为此处应该为集合),然后发射这些数据包裹(集合),而不是一次发射一个值。

List list=new ArrayList(); Observable.range(1,5).buffer(2).subscribe(RxUtils.getObserver(list));

运行结果

onSubscribe:[] Thread:Thread[main,5,main] onNext:[1, 2] Thread:Thread[main,5,main] onNext:[3, 4] Thread:Thread[main,5,main] onNext:[5] Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

flatMap

FlatMap 将一个发射数据的 Observable 变换为多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable

Observable.just("大","小").flatMap(new Function, ObservableSource>() { public ObservableSource apply(@NonNull String s) throws Exception { return Observable.just(s+"sb"); } }).subscribe(RxUtils.getObserver("0"));
onSubscribe:0 Thread:Thread[main,5,main] onNext:大sb Thread:Thread[main,5,main] onNext:小sb Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

map (注意区分 flatmap 和 map 的区别)

对 Observable 发射的每一项数据应用一个函数,执行变换操作

Observable.just("大","小").map(new Function, String>() { public String apply(@NonNull String s) throws Exception { return s+"sb"; } }).subscribe(RxUtils.getObserver("0"));
onSubscribe:0 Thread:Thread[main,5,main] onNext:大sb Thread:Thread[main,5,main] onNext:小sb Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

groupBy

将一个 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 的一个子序列

Observable,Long>> observable= Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Function, Long>() { public Long apply(@NonNull Long aLong) throws Exception { return aLong%3; } }); observable.subscribe(new Observer, Long>>() { public void onSubscribe(@NonNull Disposable d) { System.out.println("onSubscribe:"); } public void onNext(@NonNull final GroupedObservable, Long> longLongGroupedObservable) { longLongGroupedObservable.subscribe(new Consumer() { public void accept(@NonNull Long aLong) throws Exception { System.out.println("key:" + longLongGroupedObservable.getKey() +", value:" + aLong); } }); } public void onError(@NonNull Throwable e) { System.out.println("onNext:"+e); } public void onComplete() { } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }

运行结果

onSubscribe: key:0, value:0 key:1, value:1 key:2, value:2 key:0, value:3 key:1, value:4 key:2, value:5 key:0, value:6 key:1, value:7 key:2, value:8 key:0, value:9

可能代码看的有点晕,看下官方给的图就会豁然开朗

不明白?这里走 再看看 官网的文档

scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果

Observable.just(1,2,3,4,5).scan(new BiFunction, Integer, Integer>() { public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { return integer+integer2; } }).subscribe(RxUtils.getObserver());

以上代码实现了从 1 到 5 的累加

onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onNext:6 Thread:Thread[main,5,main] onNext:10 Thread:Thread[main,5,main] onNext:15 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

window

定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据

Observable.interval(1, TimeUnit.SECONDS).take(12) .window(3, TimeUnit.SECONDS) .subscribe(new Observer>() { public void onSubscribe(@NonNull Disposable d) { } public void onNext(@NonNull Observable longObservable) { longObservable.subscribe(RxUtils.getObserver()); } public void onError(@NonNull Throwable e) { } public void onComplete() { } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }

运行结果

onSubscribe Thread:Thread[main,5,main] onNext:0 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:1 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:2 Thread:Thread[RxComputationThreadPool-2,5,main] onComplete Thread:Thread[RxComputationThreadPool-2,5,main] onSubscribe Thread:Thread[RxComputationThreadPool-2,5,main] onNext:3 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:4 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:5 Thread:Thread[RxComputationThreadPool-2,5,main] onComplete Thread:Thread[RxComputationThreadPool-2,5,main] onSubscribe Thread:Thread[RxComputationThreadPool-2,5,main] onNext:6 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:7 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:8 Thread:Thread[RxComputationThreadPool-2,5,main] onComplete Thread:Thread[RxComputationThreadPool-2,5,main] onSubscribe Thread:Thread[RxComputationThreadPool-2,5,main] onNext:9 Thread:Thread[RxComputationThreadPool-2,5,main] onNext:10 Thread:Thread[RxComputationThreadPool-2,5,main] onComplete Thread:Thread[RxComputationThreadPool-1,5,main] onSubscribe Thread:Thread[RxComputationThreadPool-1,5,main] onNext:11 Thread:Thread[RxComputationThreadPool-2,5,main] onComplete Thread:Thread[RxComputationThreadPool-2,5,main] `` 注意可能你的结果可能和我的可能不同,由于两个事件源不是不一个线程,不同时间运行会有些时间差 总结:以上是RxJava的变换操作符做常用的莫过于map flatmap这两个,一定要理解两者的区别
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3197 引用 • 8215 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...