RxJava 操作符之 -- 创建操作符

本贴最后更新于 2793 天前,其中的信息可能已经时移世易

create

使用一个函数从头开始创建一个 Observable

Observable.create(new Observable.OnSubscribe<Integer>() {

				@Override
				public void call(Subscriber<? super Integer> t) {
					 try {
				            if (!t.isUnsubscribed()) {
				                for (int i = 1; i < 5; i++) {
				                    t.onNext(i);
				                }
				                t.onCompleted();
				            }
				        } catch (Exception e) {
				            t.onError(e);
				        }

				}
			}).subscribe(new Subscriber<Integer>() {

				@Override
				public void onCompleted() {
					 System.out.println("Sequence complete.");
				}

				@Override
				public void onError(Throwable e) {
					 System.err.println("Error: " + e.getMessage());
				}

				@Override
				public void onNext(Integer t) {
					System.out.println("Next: " + t);
				}
			});

运行结果:

Next: 2
Next: 3
Next: 4
Sequence complete.

defer

直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable

i = 10;
		Observable<Integer> justObservable = Observable.just(i);
		i = 12;
		Observable<Integer> deferObservable = Observable
				.defer(new Func0<Observable<Integer>>() {

					@Override
					public Observable<Integer> call() {
						return Observable.just(i);
					}
				});
		i=15;
		justObservable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {

			}

			@Override
			public void onError(Throwable e) {

			}

			@Override
			public void onNext(Integer t) {
				 System.out.println("just result:" + t.toString());
			}

		} );
		deferObservable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {

			}

			@Override
			public void onError(Throwable e) {

			}

			@Override
			public void onNext(Integer t) {
				 System.out.println("defer result:" + t.toString());
			}
		});

输出结果

just result:10
defer result:15

empty

创建一个不发射任何数据但是正常终止的 Observable

Observable<Integer> emptyObservable=Observable.empty();
		emptyObservable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");
			}

			@Override
			public void onError(Throwable e) {

			}

			@Override
			public void onNext(Integer t) {
				 System.out.println("empty result:" + t.toString());
			}
		});

结果

onCompleted

never

创建一个不发射数据也不终止的 Observable

Observable<Integer> nerverObservable=Observable.never();
		nerverObservable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("onError"+e);
			}

			@Override
			public void onNext(Integer t) {
				 System.out.println("empty result:" + t.toString());
			}
		});

结果


error

创建一个不发射数据以一个错误终止的 Observable

Observable<Integer> errorObservable=Observable.error(new Throwable());
		errorObservable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");
			}

			@Override
			public void onError(Throwable e) {
				e.printStackTrace();
			}

			@Override
			public void onNext(Integer t) {
				 System.out.println("empty result:" + t.toString());
			}
		});

结果

java.lang.Throwable
	at hiquanta.rxjava.operators.create.Empty_Never_Throw.main(Empty_Never_Throw.java:49)

from

将其它种类的对象和数据类型转换为 Observable

Integer[] items = { 0, 1, 2, 3, 4, 5 };
		Observable<Integer> myObservable = Observable.from(items);
		myObservable.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer t) {
				 System.out.println(t);
			}
		},
	    new Action1<Throwable>() {
	        @Override
	        public void call(Throwable error) {
	            System.out.println("Error encountered: " + error.getMessage());
	        }
	    },
	    new Action0() {
	        @Override
	        public void call() {
	            System.out.println("Sequence complete");
	        }
	    });

结果:

0
1
2
3
4
5
Sequence complete

interval

创建一个按固定时间间隔发射整数序列的 Observable

Observable<Long> observable=Observable.interval(1, TimeUnit.SECONDS);
  observable.subscribe(new Action1<Long>() {

    @Override
    public void call(Long t) {
      System.out.println("interval:"+t);
    }
  });
  try {
    Thread.sleep(Integer.MAX_VALUE);
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }

结果

interval:0
interval:1
interval:2
interval:3
interval:4
interval:5
interval:6
interval:7
interval:8
...

just

创建一个发射指定值的 Observable

Observable.just(1, 2, 3)
    .subscribe(new Subscriber<Integer>() {
  @Override
  public void onNext(Integer item) {
      System.out.println("Next: " + item);
  }

  @Override
  public void onError(Throwable error) {
      System.err.println("Error: " + error.getMessage());
  }

  @Override
  public void onCompleted() {
      System.out.println("Sequence complete.");
  }
});

结果

Next: 1
Next: 2
Next: 3
Sequence complete.

range

创建一个发射特定整数序列的 Observable

Observable.range(3, 10).subscribe(new Subscriber<Integer>() {
M
			@Override
			public void onCompleted() {
				 System.out.println("Sequence complete.");

			}

			@Override
			public void onError(Throwable e) {
				 System.out.println("Sequence complete.");

			}

			@Override
			public void onNext(Integer t) {
			    System.out.println("Next:" + t.toString());

			}
		});

结果

Next:3
Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
Next:10
Next:11
Next:12
Sequence complete.

repeat

repeat 操作符就是对某一个 Observable 重复产生多次结果,当 repeat()
接收到 onComplete()会触发重订阅,默认情况下运行在一个新的线程上

Observable.range(1, 5).repeat(5).subscribe(new Subscriber<Integer>()
    {

    @Override
    public void onCompleted() {
    System.out.println("onCompleted");

    }

    @Override
    public void onError(Throwable e) {
    System.out.println("onError");
    }

    @Override
    public void onNext(Integer t) {
    System.out.println("onNext:"+t);
    }
    });

结果

onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onCompleted

repeatWhen

它不是缓存和重放原始 Observable 的数据序列,而是有条件的重新订阅和发射原来的 Observable。
将原始 Observable 的终止通知(完成或错误)当做一个 void 数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操作符,接受一个发射 void 通知的 Observable 为输入,返回一个发射 void 数据(意思是,重新订阅和发射原始 Observable)或者直接终止(意思是,使用 repeatWhen 终止发射数据)的 Observable。

Observable
				.range(1, 5)
				.repeatWhen(
						new Func1<Observable<? extends Void>, Observable<?>>() {
							@Override
							public Observable<?> call(
									Observable<? extends Void> observable) {
								return Observable.timer(6, TimeUnit.SECONDS);
							}
						}).subscribe(new Observer<Integer>() {
					@Override
					public void onCompleted() {
						System.out.println("onCompleted");
					}

					@Override
					public void onError(Throwable e) {
						System.out.println("onError:" + e);
					}

					@Override
					public void onNext(Integer integer) {
						System.out.println("onNext:" + integer);
					}
				});
		try {
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

结果

onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onCompleted

Start

非标准模块.
返回一个 Observable,它发射一个类似于函数声明的值.

timer

创建一个 Observable,它在一个给定的延迟后发射一个特殊的值。

Observable.timer(4, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                System.out.println(aLong+"");
            }
        });
		try {
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

结果

0

代码

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...