RxJava2.0 操作符之 -- 过滤操作符

本贴最后更新于 2809 天前,其中的信息可能已经时移俗易

debounce

仅在过了一段指定的时间还没发射数据时才发射一个数据

Observable.create(new ObservableOnSubscribe() { public void subscribe(@NonNull ObservableEmitter e) throws Exception { try { //产生结果的间隔时间分别为100、200、300...900毫秒 for (int i = 1; i < 100; i++) { e.onNext(i); Thread.sleep(i * 100); } e.onComplete(); }catch(Exception ex){ e.onError(ex); } } }).debounce(500, TimeUnit.MILLISECONDS).subscribe(RxUtils.getObserver()); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }

运行结果

onSubscribe Thread:Thread[main,5,main] onNext:6 Thread:Thread[RxComputationThreadPool-1,5,main] onNext:7 Thread:Thread[RxComputationThreadPool-1,5,main] onNext:8 Thread:Thread[RxComputationThreadPool-1,5,main] onNext:9

distinct

抑制(过滤掉)事件源发出的重复的数据项

Observable.just(1, 2, 1, 1, 2, 3,3,3,5).distinct().subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onNext:5 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

elementAt

只发射第 N 项数据

Observable observable= Observable.just(1,2,3,4,5,6).elementAt(2).toObservable(); observable.subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

filter

只发射通过了谓词测试的数据项

Observable.just(1, 2, 3, 4, 5).filter(new Predicate() { public boolean test(@NonNull Integer integer) throws Exception { return integer>3; } }).subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:4 Thread:Thread[main,5,main] onNext:5 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

first

只发射第一项(或者满足某个条件的第一项)数据

Observable.just(1, 2, 3).first(5).toObservable().subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

ignoreElements

不发射任何数据,只发射 Observable 的终止通知

Observable.just(1,2,3).ignoreElements().toObservable().subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

last

只发射最后一项(或者满足某个条件的最后一项)数据

Observable.just(1, 2, 3).last(5).toObservable().subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

sample

定期发射 Observable 最近发射的数据项

Observable.create(new ObservableOnSubscribe() { public void subscribe(@NonNull ObservableEmitter e) throws Exception { try { // 前8个数字产生的时间间隔为1秒,后一个间隔为3秒 for (int i = 1; i < 9; i++) { e.onNext(i); Thread.sleep(1000); } Thread.sleep(2000); e.onNext(9); e.onComplete(); } catch (Exception ex) { e.onError(ex); } } }).sample(2200, TimeUnit.MILLISECONDS) .subscribe(RxUtils.getObserver()); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { e1.printStackTrace(); }
Thread:Thread[main,5,main] onNext:3 Thread:Thread[RxComputationThreadPool-1,5,main] onNext:5 Thread:Thread[RxComputationThreadPool-1,5,main] onNext:7 Thread:Thread[RxComputationThreadPool-1,5,main] onNext:8 Thread:Thread[RxComputationThreadPool-1,5,main] onComplete

skip

抑制 Observable 发射的前 N 项数据

Observable.just(1,2,3,4,5,6,7,8).skip(3).subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:4 Thread:Thread[main,5,main] onNext:5 Thread:Thread[main,5,main] onNext:6 Thread:Thread[main,5,main] onNext:7 Thread:Thread[main,5,main] onNext:8 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

skipLast

suppress the final n items emitted by an Observable

Observable.just(1, 2, 3, 4, 5, 6, 7).skipLast(3).subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onNext:4 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

Take

emit only the first n items emitted by an Observable

Observable.just(1,2,3,4,5,6,7,8).take(3).subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]

takeLast

发射 Observable 发射的最后 N 项数据

Observable.just(1,2,3,4,5,6,7,8).takeLast(3).subscribe(RxUtils.getObserver());
onSubscribe Thread:Thread[main,5,main] onNext:6 Thread:Thread[main,5,main] onNext:7 Thread:Thread[main,5,main] onNext:8 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main]
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3194 引用 • 8214 回帖
  • 线程
    122 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...