Delay
延迟一段指定的时间再发射来自 Observable 的发射物
Observable.just(1, 2, 3, 4).delay(5, TimeUnit.SECONDS).subscribe(RxUtils.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Do
注册一个动作作为原始 Observable 生命周期事件的一种占位符,相当于注册回调
其中 RxJava 中的实现为 doXXX 很简单 看名字就知道回调时机和怎么用
//doOnEach
// Observable.just(1,2,3,4,5).doOnEach(RxUtils.getObserver()).subscribe();
/**
* onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onNext:4 Thread:Thread[main,5,main] onNext:5 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main] */ //doOnNext
// Observable.just(1,2,3,4,5).doOnNext(new Consumer() {
// public void accept(@NonNull Integer integer) throws Exception {
// if(integer>3)
// throw new RuntimeException("error");
// }
// }).subscribe(RxUtils.getObserver());
/**
* onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onError:java.lang.RuntimeException: error Thread:Thread[main,5,main] */ //doOnSubscribe
// Observable observable = Observable.just(1, 2, 3, 4, 5, 6).doOnSubscribe(new Consumer() {
// public void accept(@NonNull Disposable disposable) throws Exception {
// System.out.println("published");
// }
// });
// observable.subscribe(RxUtils.getObserver());
// observable.subscribe(RxUtils.getObserver());
/**
* published onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onNext:4 Thread:Thread[main,5,main] onNext:5 Thread:Thread[main,5,main] onNext:6 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main] published onSubscribe Thread:Thread[main,5,main] onNext:1 Thread:Thread[main,5,main] onNext:2 Thread:Thread[main,5,main] onNext:3 Thread:Thread[main,5,main] onNext:4 Thread:Thread[main,5,main] onNext:5 Thread:Thread[main,5,main] onNext:6 Thread:Thread[main,5,main] onComplete Thread:Thread[main,5,main] */ //doOnUnsubscribe 2.0 has removed instead of
DisposableSubscriber disposableSubscriber = new DisposableSubscriber() {
public void onNext(Long along) {
System.out.println(along);
}
public void onError(Throwable throwable) {
System.out.println(throwable);
}
public void onComplete() {
System.out.println("onComplete");
}
};
Flowable.interval(1000, TimeUnit.MILLISECONDS).doOnCancel(new Action() {
public void run() throws Exception {
System.out.println("doOnCancel");
}
}).subscribe(disposableSubscriber);
disposableSubscriber.dispose();
Materialize/Dematerialize
Materialize
将数据项和事件通知都当做数据项发射,Dematerialize
刚好相反。
Dematerialize
操作符是 Materialize
的逆向过程,它将 Materialize
转换的结果还原成它原本的形式。
Observable.just(1,2,3,4).materialize().subscribe(RxUtils.>getObserver());
Observable.just(1,2,3,4).materialize().dematerialize().subscribe(RxUtils.getObserver());
ObserveOn_SubscribOn
ObserveOn:指定一个观察者在哪个调度器上观察这个 Observable
SubscribOn:用来指定 Observable 在哪个线程上运行
// Observable.just(1,2,3,4,5).observeOn( Schedulers.newThread()).subscribeOn(Schedulers.computation()).subscribe(RxUtils.getObserver());
Observable.just(1,2,3,4,5).subscribeOn( Schedulers.newThread()).subscribeOn(Schedulers.computation()).subscribe(RxUtils.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Serialize
强制一个 Observable 连续调用并保证行为正确
这个说明目前没有从文档中找到能够说明此功能的代码,自己的测试代码也有点问题
如果你又好的方法,可以留言告诉我
Subscribe
操作来自 Observable 的发射物和通知
一直在用这个方法,很简单,就不贴代码了
1.x 2.x 方法差异如下
TimeInterval
将一个发射数据的 Observable 转换为发射那些数据发射时间间隔的对象
Observable.interval(3, TimeUnit.SECONDS)
.timeInterval()
.subscribe(RxUtils.>getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
onSubscribe
Thread:Thread[main,5,main]
onNext:Timed[time=3007, unit=MILLISECONDS, value=0]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=1]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3001, unit=MILLISECONDS, value=2]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=3]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=4]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=2999, unit=MILLISECONDS, value=5]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=6]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3001, unit=MILLISECONDS, value=7]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=8]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=9]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=10]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=3000, unit=MILLISECONDS, value=11]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=2999, unit=MILLISECONDS, value=12]
Timeout
对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知
Observable.interval(3, TimeUnit.SECONDS)
.timeout(2,TimeUnit.SECONDS)
.subscribe(RxUtils.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe
Thread:Thread[main,5,main]
onError:java.util.concurrent.TimeoutException
Thread:Thread[RxComputationThreadPool-1,5,main]
Timestamp
给 Observable 发射的数据项附加一个时间戳
Observable.interval(3, TimeUnit.SECONDS)
.timestamp()
.subscribe(RxUtils.>getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onSubscribe
Thread:Thread[main,5,main]
onNext:Timed[time=1500432643307, unit=MILLISECONDS, value=0]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=1500432646306, unit=MILLISECONDS, value=1]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=1500432649306, unit=MILLISECONDS, value=2]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=1500432652306, unit=MILLISECONDS, value=3]
Thread:Thread[RxComputationThreadPool-1,5,main]
onNext:Timed[time=1500432655307, unit=MILLISECONDS, value=4]
Using
创建一个只在 Observable 生命周期内存在的一次性资源
Observable.interval(3, TimeUnit.SECONDS)
.using(new Callable() {
public Long call() throws Exception {
return Long.valueOf(2);
}
}, new Function, ObservableSource>() {
public ObservableSource apply(@NonNull Long aLong) throws Exception {
return new ObservableSource() {
public void subscribe(@NonNull Observersuper Long> observer) {
observer.onNext(Long.valueOf(2));
}
};
}
}, new Consumer() {
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(aLong);
}
})
.subscribe(RxUtils.getObserver());
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
onNext:2
Thread:Thread[main,5,main]
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于