RxJava2.0 操作符之 -- 变换操作符

本贴最后更新于 2684 天前,其中的信息可能已经物是人非

buffer

定期收集 Observable 的数据放进一个数据包裹(不懂为什么翻译为包裹,可能是谷歌翻译吧,我认为此处应该为集合),然后发射这些数据包裹(集合),而不是一次发射一个值。

  List list=new ArrayList();
  Observable.range(1,5).buffer(2).subscribe(RxUtils.getObserver(list));

运行结果

  onSubscribe:[]
  Thread:Thread[main,5,main]
  onNext:[1, 2]
  Thread:Thread[main,5,main]
  onNext:[3, 4]
  Thread:Thread[main,5,main]
  onNext:[5]
  Thread:Thread[main,5,main]
  onComplete
  Thread:Thread[main,5,main]

flatMap

FlatMap 将一个发射数据的 Observable 变换为多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable

	Observable.just("大","小").flatMap(new Function, ObservableSource>() {
		  public ObservableSource apply(@NonNull String s) throws Exception {
			  return Observable.just(s+"sb");
	  }
	  }).subscribe(RxUtils.getObserver("0"));
	onSubscribe:0
	Thread:Thread[main,5,main]
	onNext:大sb
	Thread:Thread[main,5,main]
	onNext:小sb
	Thread:Thread[main,5,main]
	onComplete
	Thread:Thread[main,5,main]

map (注意区分 flatmap 和 map 的区别)

对 Observable 发射的每一项数据应用一个函数,执行变换操作

   Observable.just("大","小").map(new Function, String>() {
	  public String apply(@NonNull String s) throws Exception {
		  return s+"sb";
	}
  }).subscribe(RxUtils.getObserver("0"));
   onSubscribe:0
  Thread:Thread[main,5,main]
  onNext:大sb
  Thread:Thread[main,5,main]
  onNext:小sb
  Thread:Thread[main,5,main]
  onComplete
  Thread:Thread[main,5,main] 

groupBy

将一个 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 的一个子序列

Observable,Long>> observable= Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Function, Long>() {
     public Long apply(@NonNull Long aLong) throws Exception {

         return aLong%3;
  }
 });
  observable.subscribe(new Observer, Long>>() {
     public void onSubscribe(@NonNull Disposable d) {
         System.out.println("onSubscribe:");
  }

     public void onNext(@NonNull final GroupedObservable, Long> longLongGroupedObservable) {
         longLongGroupedObservable.subscribe(new Consumer() {
             public void accept(@NonNull Long aLong) throws Exception {
                 System.out.println("key:" + longLongGroupedObservable.getKey() +", value:" + aLong);
  }
         });
  }

     public void onError(@NonNull Throwable e) {
         System.out.println("onNext:"+e);
  }

     public void onComplete() {

     }
 });
 try {
     Thread.sleep(Integer.MAX_VALUE);
  } catch (InterruptedException e1) {
     e1.printStackTrace();
  }  

运行结果

onSubscribe:
key:0, value:0
key:1, value:1
key:2, value:2
key:0, value:3
key:1, value:4
key:2, value:5
key:0, value:6
key:1, value:7
key:2, value:8
key:0, value:9

可能代码看的有点晕,看下官方给的图就会豁然开朗

不明白?这里走 再看看 官网的文档

scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果

Observable.just(1,2,3,4,5).scan(new BiFunction, Integer, Integer>() {
    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
        return integer+integer2;
  }
}).subscribe(RxUtils.getObserver());

以上代码实现了从 1 到 5 的累加

onSubscribe
Thread:Thread[main,5,main]
onNext:1
Thread:Thread[main,5,main]
onNext:3
Thread:Thread[main,5,main]
onNext:6
Thread:Thread[main,5,main]
onNext:10
Thread:Thread[main,5,main]
onNext:15
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]

window

定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据

Observable.interval(1, TimeUnit.SECONDS).take(12)
        .window(3, TimeUnit.SECONDS)
        .subscribe(new Observer>() {
            public void onSubscribe(@NonNull Disposable d) {

            }

            public void onNext(@NonNull Observable longObservable) {
                    longObservable.subscribe(RxUtils.getObserver());
  }

            public void onError(@NonNull Throwable e) {

            }

            public void onComplete() {

            }
        });
try {
    Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
    e1.printStackTrace();
}

运行结果

	onSubscribe
	Thread:Thread[main,5,main]
	onNext:0
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:1
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:2
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onComplete
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onSubscribe
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:3
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:4
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:5
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onComplete
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onSubscribe
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:6
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:7
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:8
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onComplete
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onSubscribe
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:9
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onNext:10
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onComplete
	Thread:Thread[RxComputationThreadPool-1,5,main]
	onSubscribe
	Thread:Thread[RxComputationThreadPool-1,5,main]
	onNext:11
	Thread:Thread[RxComputationThreadPool-2,5,main]
	onComplete
	Thread:Thread[RxComputationThreadPool-2,5,main]

``
注意可能你的结果可能和我的可能不同,由于两个事件源不是不一个线程,不同时间运行会有些时间差

  总结:以上是RxJava的变换操作符做常用的莫过于map flatmap这两个,一定要理解两者的区别
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...