create
使用一个函数从头开始创建一个 Observable
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> t) { try { if (!t.isUnsubscribed()) { for (int i = 1; i < 5; i++) { t.onNext(i); } t.onCompleted(); } } catch (Exception e) { t.onError(e); } } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer t) { System.out.println("Next: " + t); } });
运行结果:
Next: 2 Next: 3 Next: 4 Sequence complete.
defer
直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable
i = 10; Observable<Integer> justObservable = Observable.just(i); i = 12; Observable<Integer> deferObservable = Observable .defer(new Func0<Observable<Integer>>() { @Override public Observable<Integer> call() { return Observable.just(i); } }); i=15; justObservable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer t) { System.out.println("just result:" + t.toString()); } } ); deferObservable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer t) { System.out.println("defer result:" + t.toString()); } });
输出结果
just result:10 defer result:15
empty
创建一个不发射任何数据但是正常终止的 Observable
Observable<Integer> emptyObservable=Observable.empty(); emptyObservable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer t) { System.out.println("empty result:" + t.toString()); } });
结果
onCompleted
never
创建一个不发射数据也不终止的 Observable
Observable<Integer> nerverObservable=Observable.never(); nerverObservable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError"+e); } @Override public void onNext(Integer t) { System.out.println("empty result:" + t.toString()); } });
结果
error
创建一个不发射数据以一个错误终止的 Observable
Observable<Integer> errorObservable=Observable.error(new Throwable()); errorObservable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(Integer t) { System.out.println("empty result:" + t.toString()); } });
结果
java.lang.Throwable at hiquanta.rxjava.operators.create.Empty_Never_Throw.main(Empty_Never_Throw.java:49)
from
将其它种类的对象和数据类型转换为 Observable
Integer[] items = { 0, 1, 2, 3, 4, 5 }; Observable<Integer> myObservable = Observable.from(items); myObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer t) { System.out.println(t); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { System.out.println("Error encountered: " + error.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("Sequence complete"); } });
结果:
0 1 2 3 4 5 Sequence complete
interval
创建一个按固定时间间隔发射整数序列的 Observable
Observable<Long> observable=Observable.interval(1, TimeUnit.SECONDS); observable.subscribe(new Action1<Long>() { @Override public void call(Long t) { System.out.println("interval:"+t); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
结果
interval:0 interval:1 interval:2 interval:3 interval:4 interval:5 interval:6 interval:7 interval:8 ...
just
创建一个发射指定值的 Observable
Observable.just(1, 2, 3) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
结果
Next: 1 Next: 2 Next: 3 Sequence complete.
range
创建一个发射特定整数序列的 Observable
Observable.range(3, 10).subscribe(new Subscriber<Integer>() { M @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.out.println("Sequence complete."); } @Override public void onNext(Integer t) { System.out.println("Next:" + t.toString()); } });
结果
Next:3 Next:4 Next:5 Next:6 Next:7 Next:8 Next:9 Next:10 Next:11 Next:12 Sequence complete.
repeat
repeat 操作符就是对某一个 Observable 重复产生多次结果,当 repeat()
接收到 onComplete()会触发重订阅,默认情况下运行在一个新的线程上
Observable.range(1, 5).repeat(5).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError"); } @Override public void onNext(Integer t) { System.out.println("onNext:"+t); } });
结果
onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onCompleted
repeatWhen
它不是缓存和重放原始 Observable 的数据序列,而是有条件的重新订阅和发射原来的 Observable。
将原始 Observable 的终止通知(完成或错误)当做一个 void 数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操作符,接受一个发射 void 通知的 Observable 为输入,返回一个发射 void 数据(意思是,重新订阅和发射原始 Observable)或者直接终止(意思是,使用 repeatWhen 终止发射数据)的 Observable。
Observable .range(1, 5) .repeatWhen( new Func1<Observable<? extends Void>, Observable<?>>() { @Override public Observable<?> call( Observable<? extends Void> observable) { return Observable.timer(6, TimeUnit.SECONDS); } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError:" + e); } @Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); }
结果
onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onNext:1 onNext:2 onNext:3 onNext:4 onNext:5 onCompleted
Start
非标准模块.
返回一个 Observable,它发射一个类似于函数声明的值.
timer
创建一个 Observable,它在一个给定的延迟后发射一个特殊的值。
Observable.timer(4, TimeUnit.SECONDS).subscribe(new Action1<Long>() { @Override public void call(Long aLong) { System.out.println(aLong+""); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); }
结果
0
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于