RxJava 操作符之 -- 过滤操作符

本贴最后更新于 2793 天前,其中的信息可能已经事过境迁

debounce

仅在过了一段指定的时间还没发射数据时才发射一个数据

Observable.create(new Observable.OnSubscribe<Integer>() {

			@Override
			public void call(Subscriber<? super Integer> t) {
				try {
                    //产生结果的间隔时间分别为100、200、300...900毫秒
                    for (int i = 1; i < 10; i++) {
                        t.onNext(i);
                        Thread.sleep(i * 100);
                    }
                    t.onCompleted();
                }catch(Exception e){
                    t.onError(e);
                }
			}
		}).subscribeOn(Schedulers.newThread())
		.debounce(400,TimeUnit.MILLISECONDS)
		.subscribe(  new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("Next:" + integer);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error:" + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("completed!");
            }
        });
		 try {
				Thread.sleep(Integer.MAX_VALUE);
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}

结果

Next:5
Next:6
Next:7
Next:8
Next:9
completed!

distinct

抑制(过滤掉)重复的数据项

Observable.just(1, 2, 1, 1, 2, 3).distinct()
				.subscribe(new Subscriber<Integer>() {
					@Override
					public void onNext(Integer item) {
						System.out.println("Next: " + item);
					}

					@Override
					public void onError(Throwable error) {
						System.err.println("Error: " + error.getMessage());
					}

					@Override
					public void onCompleted() {
						System.out.println("Sequence complete.");
					}
				});

结果

Next: 1
Next: 2
Next: 3
Sequence complete.

elementAt

只发射第 N 项数据

Observable.just(1, 2, 3, 4, 5, 6).elementAt(2)
				.subscribe(new Action1<Integer>() {
					@Override
					public void call(Integer integer) {
						System.out.println("Next:" + integer);
					}
				}, new Action1<Throwable>() {
					@Override
					public void call(Throwable throwable) {
						System.out.println("Error:" + throwable.getMessage());
					}
				}, new Action0() {
					@Override
					public void call() {
						System.out.println("completed!");
					}
				});

结果

Next:3
completed!

filter

只发射通过了谓词测试的数据项

Observable.just(1, 2, 3, 4, 5).filter(new Func1<Integer, Boolean>() {
			@Override
			public Boolean call(Integer item) {
				return item < 4;
			}
		}).subscribe(new Subscriber<Integer>() {
			@Override
			public void onNext(Integer item) {
				System.out.println("Next: " + item);
			}

			@Override
			public void onError(Throwable error) {
				System.err.println("Error: " + error.getMessage());
			}

			@Override
			public void onCompleted() {
				System.out.println("Sequence complete.");
			}
		});

结果

Next: 1
Next: 2
Next: 3
Sequence complete.

first

只发射第一项(或者满足某个条件的第一项)数据

Observable.just(1, 2, 3).first().subscribe(new Subscriber<Integer>() {
			@Override
			public void onNext(Integer item) {
				System.out.println("Next: " + item);
			}

			@Override
			public void onError(Throwable error) {
				System.err.println("Error: " + error.getMessage());
			}

			@Override
			public void onCompleted() {
				System.out.println("Sequence complete.");
			}
		});

结果

Next: 1
Sequence complete.

ignoreElements

不发射任何数据,只发射 Observable 的终止通知

Observable.just(1, 2, 3).ignoreElements()
				.subscribe(new Subscriber<Integer>() {
					@Override
					public void onNext(Integer item) {
						System.out.println("Next: " + item);
					}

					@Override
					public void onError(Throwable error) {
						System.err.println("Error: " + error.getMessage());
					}

					@Override
					public void onCompleted() {
						System.out.println("Sequence complete.");
					}
				});

结果

Sequence complete.

last

只发射最后一项(或者满足某个条件的最后一项)数据

Observable.just(1, 2, 3).last().subscribe(new Subscriber<Integer>() {
			@Override
			public void onNext(Integer item) {
				System.out.println("Next: " + item);
			}

			@Override
			public void onError(Throwable error) {
				System.err.println("Error: " + error.getMessage());
			}

			@Override
			public void onCompleted() {
				System.out.println("Sequence complete.");
			}
		});

###结果

Next: 3
Sequence complete.

sample

定期发射 Observable 最近发射的数据项

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
      if (subscriber.isUnsubscribed())
        return;
      try {
        // 前8个数字产生的时间间隔为1秒,后一个间隔为3秒
        for (int i = 1; i < 9; i++) {
          subscriber.onNext(i);
          Thread.sleep(1000);
        }
        Thread.sleep(2000);
        subscriber.onNext(9);
        subscriber.onCompleted();
      } catch (Exception e) {
        subscriber.onError(e);
      }
    }
  }).subscribeOn(Schedulers.newThread())
      .sample(2200, TimeUnit.MILLISECONDS) // 采样间隔时间为2200毫秒
      .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
          System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
          System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
          System.out.println("Sequence complete.");
        }
      });
  try {
    Thread.sleep(Integer.MAX_VALUE);
  } catch (InterruptedException e1) {
    e1.printStackTrace();
  }

结果

Next: 3
Next: 5
Next: 7
Next: 8
Sequence complete.

skip

抑制 Observable 发射的前 N 项数据

Observable.just(1,2,3,4,5,6,7).skip(3)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }

            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });

结果

Next: 4
Next: 5
Next: 6
Next: 7
Sequence complete.

skipLast

Observable.just(1, 2, 3, 4, 5, 6, 7).skipLast(3)
				.subscribe(new Subscriber<Integer>() {
					@Override
					public void onNext(Integer item) {
						System.out.println("Next: " + item);
					}

					@Override
					public void onError(Throwable error) {
						System.err.println("Error: " + error.getMessage());
					}

					@Override
					public void onCompleted() {
						System.out.println("Sequence complete.");
					}
				});

结果

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

take

只发射前面的 N 项数据

Observable.just(1, 2, 3, 4, 5, 6, 7, 8).take(4)
				.subscribe(new Subscriber<Integer>() {
					@Override
					public void onNext(Integer item) {
						System.out.println("Next: " + item);
					}

					@Override
					public void onError(Throwable error) {
						System.err.println("Error: " + error.getMessage());
					}

					@Override
					public void onCompleted() {
						System.out.println("Sequence complete.");
					}
				});

结果

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

takeFirst

takeFirst 操作符类似于 take 操作符,同时也类似于 first 操作符,都是获取源 Observable 产生的结果列表中符合指定条件的前一个或多个,与 first 操作符不同的是,first 操作符如果获取不到数据,则会抛出 NoSuchElementException 异常,而 takeFirst 则会返回一个空的 Observable,该 Observable 只有 onCompleted 通知而没有 onNext 通知。

Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
	            @Override
	            public Boolean call(Integer integer) {
	                //获取数值大于3的数据
	                return integer>3;
	            }
	        })
	          .subscribe(new Subscriber<Integer>() {
	              @Override
	              public void onNext(Integer item) {
	                  System.out.println("Next: " + item);
	              }

	              @Override
	              public void onError(Throwable error) {
	                  System.err.println("Error: " + error.getMessage());
	              }

	              @Override
	              public void onCompleted() {
	                  System.out.println("Sequence complete.");
	              }
	          });

结果

Next: 4
Sequence complete.

takeLast

发射 Observable 发射的最后 N 项数据

Observable.just(1, 2, 3, 4, 5, 6, 7).takeLast(2)
				.subscribe(new Subscriber<Integer>() {
					@Override
					public void onNext(Integer item) {
						System.out.println("Next: " + item);
					}

					@Override
					public void onError(Throwable error) {
						System.err.println("Error: " + error.getMessage());
					}

					@Override
					public void onCompleted() {
						System.out.println("Sequence complete.");
					}
				});

结果

Next: 6
Next: 7
Sequence complete.

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...