buffer
定期收集 Observable 的数据放进一个数据包裹(不懂为什么翻译为包裹,可能是谷歌翻译吧,我认为此处应该为集合),然后发射这些数据包裹(集合),而不是一次发射一个值。
List list=new ArrayList();
Observable.range(1,5).buffer(2).subscribe(RxUtils.getObserver(list));
运行结果
onSubscribe:[]
Thread:Thread[main,5,main]
onNext:[1, 2]
Thread:Thread[main,5,main]
onNext:[3, 4]
Thread:Thread[main,5,main]
onNext:[5]
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
flatMap
FlatMap 将一个发射数据的 Observable 变换为多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable
Observable.just("大","小").flatMap(new Function, ObservableSource>() {
public ObservableSource apply(@NonNull String s) throws Exception {
return Observable.just(s+"sb");
}
}).subscribe(RxUtils.getObserver("0"));
onSubscribe:0
Thread:Thread[main,5,main]
onNext:大sb
Thread:Thread[main,5,main]
onNext:小sb
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
map (注意区分 flatmap 和 map 的区别)
对 Observable 发射的每一项数据应用一个函数,执行变换操作
Observable.just("大","小").map(new Function, String>() {
public String apply(@NonNull String s) throws Exception {
return s+"sb";
}
}).subscribe(RxUtils.getObserver("0"));
onSubscribe:0
Thread:Thread[main,5,main]
onNext:大sb
Thread:Thread[main,5,main]
onNext:小sb
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
groupBy
将一个 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 的一个子序列
Observable,Long>> observable= Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Function, Long>() {
public Long apply(@NonNull Long aLong) throws Exception {
return aLong%3;
}
});
observable.subscribe(new Observer, Long>>() {
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe:");
}
public void onNext(@NonNull final GroupedObservable, Long> longLongGroupedObservable) {
longLongGroupedObservable.subscribe(new Consumer() {
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("key:" + longLongGroupedObservable.getKey() +", value:" + aLong);
}
});
}
public void onError(@NonNull Throwable e) {
System.out.println("onNext:"+e);
}
public void onComplete() {
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
运行结果
onSubscribe:
key:0, value:0
key:1, value:1
key:2, value:2
key:0, value:3
key:1, value:4
key:2, value:5
key:0, value:6
key:1, value:7
key:2, value:8
key:0, value:9
可能代码看的有点晕,看下官方给的图就会豁然开朗
不明白?这里走 再看看 官网的文档
scan
连续地对数据序列的每一项应用一个函数,然后连续发射结果
Observable.just(1,2,3,4,5).scan(new BiFunction, Integer, Integer>() {
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(RxUtils.getObserver());
以上代码实现了从 1 到 5 的累加
onSubscribe
Thread:Thread[main,5,main]
onNext:1
Thread:Thread[main,5,main]
onNext:3
Thread:Thread[main,5,main]
onNext:6
Thread:Thread[main,5,main]
onNext:10
Thread:Thread[main,5,main]
onNext:15
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
window
定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据
Observable.interval(1, TimeUnit.SECONDS).take(12)
.window(3, TimeUnit.SECONDS)
.subscribe(new Observer>() {
public void onSubscribe(@NonNull Disposable d) {
}
public void onNext(@NonNull Observable longObservable) {
longObservable.subscribe(RxUtils.getObserver());
}
public void onError(@NonNull Throwable e) {
}
public void onComplete() {
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
运行结果
onSubscribe
Thread:Thread[main,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:1
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:2
Thread:Thread[RxComputationThreadPool-2,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-2,5,main]
onSubscribe
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:3
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:4
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:5
Thread:Thread[RxComputationThreadPool-2,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-2,5,main]
onSubscribe
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:6
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:7
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:8
Thread:Thread[RxComputationThreadPool-2,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-2,5,main]
onSubscribe
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:9
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-2,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-1,5,main]
onSubscribe
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:11
Thread:Thread[RxComputationThreadPool-2,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-2,5,main]
``
注意可能你的结果可能和我的可能不同,由于两个事件源不是不一个线程,不同时间运行会有些时间差
总结:以上是RxJava的变换操作符做常用的莫过于map flatmap这两个,一定要理解两者的区别
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于