背景
RxJava2 中的类型,除了 Observable 和 Flowable 外,还有 Single、Completable、Maybe 三种类型。这些类型有什么区别呢?
简介
类型 | 含义 | ||
---|---|---|---|
io.reactivex.Observable |
0..N flows, supporting Reactive-Streams and backpressure | ||
io.reactivex.Flowable |
0..N flows, no backpressure | ||
io.reactivex.Single |
a flow of exactly 1 item or an error | ||
io.reactivex.Completable |
a flow without items but only a completion or error signal | ||
io.reactivex.Maybe |
a flow with no items, exactly one item or an error |
示例
下面的示例,来演示如何逐步简化代码。
示例一: Flowable
先来一个最基础的 Flowable 示例。
import java.io.IOException;
import org.apache.http.client.fluent.Request;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.FlowableSubscriber;
public class HelloFlowable {
public static void main(String[] args) {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 模拟无限的上流数据
for (int i = 1;; ++i) {
emitter.onNext("http://www.abeffect.com/" + i);
}
}
}, BackpressureStrategy.BUFFER).subscribe(new FlowableSubscriber<String>() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onNext(String s) {
try {
System.out.println(Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
});
}
}
示例二: Consumer
如果在 FlowableSubscriber
中只关心 onNext
方法的话,可以使用 Consumer
import java.io.IOException;
import org.apache.http.client.fluent.Request;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.functions.Consumer;
public class HelloConsumer {
public static void main(String[] args) {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 模拟无限的上流数据
for (int i = 1;; ++i) {
emitter.onNext("http://www.abeffect.com/" + i);
}
}
}, BackpressureStrategy.BUFFER).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
try {
System.out.println(Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode());
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
从 subscribe()中,可以看出其可用的方法有:
- subscribe(Consumer<? super T>)
- subscribe(Consumer<? super T>, Consumer<? super Throwable>)
- subscribe(Consumer<? super T>, Consumer<? super Throwable>, Action)
- subscribe(Consumer<? super T>, Consumer<? super Throwable>, Action, Consumer<? super Subscription>)
- subscribe(Subscriber<? super T>)
- subscribe(FlowableSubscriber<? super T>)
可以根据需要,逐步添加 onNext, onError, onComplete, onSubscribe 参数。
同时,可以支持双参数或者多参数类型
BiConsumer<T1, T2>
: 双参数类型Consumer<Obejct[]>
: 多参数类型
Observable 和 Flowable 的对比, 来自[1]
Observable |
Flowable |
||
---|---|---|---|
元素个数 | 不超过 1000 个 | 超过 10+ 个 | |
开销 | 开销低 | 开销高 |
示例三: Single
如果只有一个 onNext
事件时,即唯一事件(流),则可以用 Single。
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloSingle {
public static void main(String[] args) {
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> singleEmitter) throws Exception {
singleEmitter.onSuccess("Hello Single");
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onError(Throwable t) {
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println(s);
}
});
}
}
结果
Hello Single
BiConsumer
这边也可以直接用 BiConsumer
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.functions.BiConsumer;
public class HelloSingle2 {
public static void main(String[] args) {
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> singleEmitter) throws Exception {
singleEmitter.onSuccess("Hello Single");
}
}).subscribe(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable t) throws Exception {
System.out.println(s);
}
});
}
}
结果
Hello Single
Consumer
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.functions.Consumer;
public class HelloSingle2 {
public static void main(String[] args) {
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> singleEmitter) throws Exception {
singleEmitter.onSuccess("Hello Single");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
}
结果
Hello Single
类型转换
Single 类型也可以转换成其它类型
- Single.toFlowable()
- Single.toObservable()
- Single.toMaybe()
示例四: Completable
如果也不关心 onNext 事件,只关心 onComplete 和 onError,则直接使用 Completable。
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloCompletable {
public static void main(String[] args) {
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("Complete");
}
});
}
}
示例五: Maybe
Single 是发送一个事件, Completable 是不发送任何事件。
如果不知道会不会发送事件呢?即有可能发送一个事件,也有可能不发送任何事件。
那这种情况对应的就是 Maybe。
发送事件时,会响应 onSuccess,不发送事件时,会响应 onComplete,二者只有一个会响应。
import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.disposables.Disposable;
public class HelloMaybe {
public static void main(String[] args) {
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("Success");
// 第二条是不会执行的
e.onComplete();
}
}).subscribe(new MaybeObserver<String>() {
@Override
public void onComplete() {
System.out.println("Complete");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println(s);
}
});
}
}
结果
Success
参考
- RxJava 2.x 使用详解(一) 快速入门: 此文有一个系列,推荐阅读.
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于