delaySubscription
延迟一段指定的时间再发射来自 Observable 的发射物
Observable.just(1,2,3,4).delaySubscription(5, TimeUnit.SECONDS).subscribe(new Action1<Integer>() {
@Override
public void call(Integer t) {
System.out.println(t+"");
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
结果
1
2
3
4
doOnEach
注册一个动作作为原始 Observable 生命周期事件的一种占位符
doOnEach 操作符让你可以注册一个回调,它产生的 Observable 每发射一项数据就会调用它一次。你可以以 Action 的形式传递参数给它,这个 Action 接受一个 onNext 的变体 Notification 作为它的唯一参数,你也可以传递一个 Observable 给 doOnEach,这个 Observable 的 onNext 会被调用,就好像它订阅了原始的 Observable 一样。
Observable.just(1,2,3,4).doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> t) {
System.out.println("doOnEach"+(Integer)t.getValue());
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer t) {
System.out.println("OnNext"+t);
}
});
结果
doOnEach1
OnNext1
doOnEach2
OnNext2
doOnEach3
OnNext3
doOnEach4
OnNext4
doOnEachnull
doOnNext
doOnSubscribe
doOnUnsubscribe
doOnCompleted
doOnError
doOnTerminate
finallyDo
Materialize/Dematerialize
Materialize 将数据项和事件通知都当做数据项发射,Dematerialize 刚好相反。
Observable.just(1, 2, 3).materialize().subscribe(i->System.out.println(i.getValue()));
结果
1
2
3
null
ObserveOn_SubscribOn
ObserveOn:指定一个观察者在哪个调度器上观察这个 Observable
SubscribOn:用来指定 Observable 在哪个线程上运行
Observable.just(1, 2, 3).observeOn(Schedulers.newThread()).
subscribeOn(Schedulers.newThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer t) {
System.out.println(t);
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
结果
1
2
3
Serialize
强制一个 Observable 连续调用并保证行为正确
Subscribe
操作来自 Observable 的发射物和通知
TimeInterval
将一个发射数据的 Observable 转换为发射那些数据发射时间间隔的 Observable
timeout
对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> t) {
for (int i = 0; i <= 3; i++) {
try {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
t.onNext(i);
}
}
}).timeout(200, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer t) {
System.out.println(t);
}
});
结果
Timestamp
给 Observable 发射的数据项附加一个时间戳
using
创建一个只在 Observable 生命周期内存在的一次性资源
to
将 Observable 转换为另一个对象或数据结构
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于