And/Then/When
使用 Pattern 和 Plan 作为中介,将两个或多个 Observable 发射的数据集合并到一起
它们属于 rxjava-joins 模块,不是核心 RxJava 包的一部分。
combineLatest
当两个 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。
英文解释,翻译的不是太明确 when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit ## items based on the results ##of this function
Observable o1 = Observable.just(1, 2, 3);
Observable o2 = Observable.just(4, 5, 6);
Observable o3 = Observable.just(7, 8, 9);
Observable.combineLatest(o1, o2, o3, new Function3, Integer, Integer, Integer>() {
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2, @NonNull Integer integer3) throws Exception {
return integer + integer2 + integer3;
}
}).subscribe(RxUtils.getObserver());
运行结果
onSubscribe
Thread:Thread[main,5,main]
onNext:16
Thread:Thread[main,5,main]
onNext:17
Thread:Thread[main,5,main]
onNext:18
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
join
任何时候,只要在另一个 Observable 发射的数据定义的时间窗口内,这个 Observable 发射了一条数据,就结合两个 Observable 发射的数据
当时找到一篇别人写的文章,对 join 的理解还是比较深刻的如何理解 RxJava 中的 join 操作
//产生0,5,10,15,20数列
Observable observable1 = Observable.interval(500,TimeUnit.MILLISECONDS)
// .delay(600,TimeUnit.MILLISECONDS)
.map(new Function, Long>() {
public Long apply(@NonNull Long aLong) throws Exception {
return aLong*5;
}
});
// 产生0,10,20,30,40数列
Observable observable2=Observable.interval(500,TimeUnit.MILLISECONDS)
// .delay(1000,TimeUnit.MILLISECONDS)
.map(new Function, Long>() {
public Long apply(@NonNull Long aLong) throws Exception {
return aLong*10;
}
});
// observable2.subscribe(RxUtils.getObserver());
observable1.join(observable2, new Function, ObservableSource>() {
public ObservableSource apply(@NonNull Long aLong) throws Exception {
return Observable.just(String.valueOf(aLong)).delay(600, TimeUnit.MILLISECONDS);
}
}, new Function, ObservableSource>() {
public ObservableSource apply(@NonNull Long aLong) throws Exception {
return Observable.just(String.valueOf(aLong));
}
}, new BiFunction, Long, String>() {
public String apply(@NonNull Long aLong, @NonNull Long aLong2) throws Exception {
return aLong + ":" + aLong2;
}
})
.subscribe(RxUtils.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe
Thread:Thread[main,5,main]
onNext:0:10
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:5:10
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:5:20
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10:20
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10:30
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:15:30
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:15:40
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:20:40
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:20:50
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:25:50
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:25:60
Thread:Thread[RxComputationThreadPool-2,5,main]
merge
合并多个 Observables 的发射物
//产生0,5,10,15,20数列
Observable observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
// .delay(600,TimeUnit.MILLISECONDS)
.map(new Function, Long>() {
public Long apply(@NonNull Long aLong) throws Exception {
return aLong*5;
}
});
// 产生0,10,20,30,40数列
Observable observable2=Observable.interval(500,TimeUnit.MILLISECONDS)
.delay(1000,TimeUnit.MILLISECONDS)
.map(new Function, Long>() {
public Long apply(@NonNull Long aLong) throws Exception {
return aLong*10;
}
});
Observable.merge(observable1,observable2).subscribe(RxUtils.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe
Thread:Thread[main,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:5
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:15
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:20
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:20
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:25
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:30
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:30
StartWith
在数据序列的开头插入一条指定的项
Observable.just(10, 20, 30).startWith(2).subscribe(RxUtils.getObserver());
onSubscribe
Thread:Thread[main,5,main]
onNext:2
Thread:Thread[main,5,main]
onNext:10
Thread:Thread[main,5,main]
onNext:20
Thread:Thread[main,5,main]
onNext:30
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
switchOnNext
将一个发射多个 Observables 的 Observable 转换成另一个单独的 Observable,后者发射那些 Observables 最近发射的数据项
很绕口是吧 看代码 很简单 画个图 会更高理解,有时间 我会把图补上
// 每隔500毫秒产生一个observable
Observable> observable= Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function, Observable>() {
public Observable apply(@NonNull Long aLong) throws Exception {
// 每隔250毫秒产生一组数据(0,10,20,30,40)
return Observable.interval(250,TimeUnit.MILLISECONDS).map(new Function,Long>() {
public Long apply(@NonNull Long l) throws Exception {
return l * 10;
}
}).take(5);
}
}).take(2);
// observable.subscribe(RxUtils.>getObserver());
Observable.switchOnNext(observable).subscribe(RxUtils.getObserver(Long.valueOf(1)));
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe:1
Thread:Thread[main,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:20
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:30
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:40
Thread:Thread[RxComputationThreadPool-3,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-3,5,main]
## zip
## 通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
```java
Observable observable1 = Observable.just(10,20,30);
Observable observable2 = Observable.just(4, 8, 12, 16);
Observable.zip(observable1, observable2, new BiFunction, Integer, Integer>() {
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(RxUtils.getObserver());
onSubscribe
Thread:Thread[main,5,main]
onNext:14
Thread:Thread[main,5,main]
onNext:28
Thread:Thread[main,5,main]
onNext:42
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于