RxJava2.0 操作符之 -- 结合操作符

本贴最后更新于 2542 天前,其中的信息可能已经时过境迁

And/Then/When

使用 Pattern 和 Plan 作为中介,将两个或多个 Observable 发射的数据集合并到一起

它们属于 rxjava-joins 模块,不是核心 RxJava 包的一部分。

combineLatest

当两个 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

英文解释,翻译的不是太明确 when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit ## items based on the results ##of this function

Observable o1 = Observable.just(1, 2, 3);
Observable o2 = Observable.just(4, 5, 6);
Observable o3 = Observable.just(7, 8, 9);

Observable.combineLatest(o1, o2, o3, new Function3, Integer, Integer, Integer>() {
    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2, @NonNull Integer integer3) throws Exception {
        return integer + integer2 + integer3;
  }
}).subscribe(RxUtils.getObserver());

运行结果

  onSubscribe
  Thread:Thread[main,5,main]
  onNext:16
  Thread:Thread[main,5,main]
  onNext:17
  Thread:Thread[main,5,main]
  onNext:18
  Thread:Thread[main,5,main]
  onComplete
  Thread:Thread[main,5,main]

join

任何时候,只要在另一个 Observable 发射的数据定义的时间窗口内,这个 Observable 发射了一条数据,就结合两个 Observable 发射的数据

当时找到一篇别人写的文章,对 join 的理解还是比较深刻的如何理解 RxJava 中的 join 操作

//产生0,5,10,15,20数列
  Observable observable1 = Observable.interval(500,TimeUnit.MILLISECONDS)
               // .delay(600,TimeUnit.MILLISECONDS)
  .map(new Function, Long>() {
            public Long apply(@NonNull Long aLong) throws Exception {
                return aLong*5;
  }
        });
  // 产生0,10,20,30,40数列
  Observable observable2=Observable.interval(500,TimeUnit.MILLISECONDS)
            // .delay(1000,TimeUnit.MILLISECONDS)
  .map(new Function, Long>() {
            public Long apply(@NonNull Long aLong) throws Exception {
                return aLong*10;
  }
        });
//        observable2.subscribe(RxUtils.getObserver());
  observable1.join(observable2, new Function, ObservableSource>() {
            public ObservableSource apply(@NonNull Long aLong) throws Exception {
                return Observable.just(String.valueOf(aLong)).delay(600, TimeUnit.MILLISECONDS);
  }
        }, new Function, ObservableSource>() {
            public ObservableSource apply(@NonNull Long aLong) throws Exception {
                return Observable.just(String.valueOf(aLong));
  }
        }, new BiFunction, Long, String>() {
            public String apply(@NonNull Long aLong, @NonNull Long aLong2) throws Exception {
                return aLong + ":" + aLong2;
  }
        })
                .subscribe(RxUtils.getObserver());
 try {
            Thread.sleep(Integer.MAX_VALUE);
  } catch (InterruptedException e1) {
            e1.printStackTrace();
  }
onSubscribe
Thread:Thread[main,5,main]
onNext:0:10
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:5:10
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:5:20
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10:20
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10:30
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:15:30
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:15:40
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:20:40
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:20:50
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:25:50
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:25:60
Thread:Thread[RxComputationThreadPool-2,5,main]

merge

合并多个 Observables 的发射物

//产生0,5,10,15,20数列
Observable observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
        // .delay(600,TimeUnit.MILLISECONDS)
  .map(new Function, Long>() {
            public Long apply(@NonNull Long aLong) throws Exception {
                return aLong*5;
  }
        });
// 产生0,10,20,30,40数列
Observable observable2=Observable.interval(500,TimeUnit.MILLISECONDS)
         .delay(1000,TimeUnit.MILLISECONDS)
        .map(new Function, Long>() {
            public Long apply(@NonNull Long aLong) throws Exception {
                return aLong*10;
  }
        });
Observable.merge(observable1,observable2).subscribe(RxUtils.getObserver());
try {
    Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
    e1.printStackTrace();
}
  onSubscribe
Thread:Thread[main,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:5
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:15
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:20
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:20
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:25
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:30
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:30

StartWith

在数据序列的开头插入一条指定的项

  Observable.just(10, 20, 30).startWith(2).subscribe(RxUtils.getObserver());
	onSubscribe
  Thread:Thread[main,5,main]
  onNext:2
  Thread:Thread[main,5,main]
  onNext:10
  Thread:Thread[main,5,main]
  onNext:20
  Thread:Thread[main,5,main]
  onNext:30
  Thread:Thread[main,5,main]
  onComplete
  Thread:Thread[main,5,main]

switchOnNext

将一个发射多个 Observables 的 Observable 转换成另一个单独的 Observable,后者发射那些 Observables 最近发射的数据项

很绕口是吧 看代码 很简单 画个图 会更高理解,有时间 我会把图补上

// 每隔500毫秒产生一个observable
  Observable> observable=  Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function, Observable>() {
            public Observable apply(@NonNull Long aLong) throws Exception {
                // 每隔250毫秒产生一组数据(0,10,20,30,40)

  return Observable.interval(250,TimeUnit.MILLISECONDS).map(new Function,Long>() {
                    public Long apply(@NonNull Long l) throws Exception {
                        return l * 10;
  }
                }).take(5);
  }
        }).take(2);
//      observable.subscribe(RxUtils.>getObserver());
  Observable.switchOnNext(observable).subscribe(RxUtils.getObserver(Long.valueOf(1)));
 try {
            Thread.sleep(Integer.MAX_VALUE);
  } catch (InterruptedException e1) {
            e1.printStackTrace();
  }
onSubscribe:1
Thread:Thread[main,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-2,5,main]
onNext:0
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:10
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:20
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:30
Thread:Thread[RxComputationThreadPool-3,5,main]
onNext:40
Thread:Thread[RxComputationThreadPool-3,5,main]
onComplete
Thread:Thread[RxComputationThreadPool-3,5,main]
## zip
## 通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
```java
Observable observable1 = Observable.just(10,20,30);
Observable observable2 = Observable.just(4, 8, 12, 16);
Observable.zip(observable1, observable2, new BiFunction, Integer, Integer>() {
    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
        return integer+integer2;
  }
}).subscribe(RxUtils.getObserver());
onSubscribe
Thread:Thread[main,5,main]
onNext:14
Thread:Thread[main,5,main]
onNext:28
Thread:Thread[main,5,main]
onNext:42
Thread:Thread[main,5,main]
onComplete
Thread:Thread[main,5,main]
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3169 引用 • 8208 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...