RxJava2 中的基本类型

本贴最后更新于 2371 天前,其中的信息可能已经斗转星移

背景

RxJava2 中的类型,除了 Observable 和 Flowable 外,还有 Single、Completable、Maybe 三种类型。这些类型有什么区别呢?

简介

类型 含义
io.reactivex.Observable 0..N flows, supporting Reactive-Streams and backpressure
io.reactivex.Flowable 0..N flows, no backpressure
io.reactivex.Single a flow of exactly 1 item or an error
io.reactivex.Completable a flow without items but only a completion or error signal
io.reactivex.Maybe a flow with no items, exactly one item or an error

示例

下面的示例,来演示如何逐步简化代码。

示例一: Flowable

先来一个最基础的 Flowable 示例。

import java.io.IOException;

import org.apache.http.client.fluent.Request;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.FlowableSubscriber;

public class HelloFlowable {
	public static void main(String[] args) {

		Flowable.create(new FlowableOnSubscribe<String>() {

			@Override
			public void subscribe(FlowableEmitter<String> emitter) throws Exception {
				// 模拟无限的上流数据
				for (int i = 1;; ++i) {
					emitter.onNext("http://www.abeffect.com/" + i);
				}
			}
		}, BackpressureStrategy.BUFFER).subscribe(new FlowableSubscriber<String>() {

			@Override
			public void onComplete() {

			}

			@Override
			public void onError(Throwable t) {

			}

			@Override
			public void onNext(String s) {
				try {
					System.out.println(Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode());
				} catch (IOException e) {
					e.printStackTrace();
				}
			}

			@Override
			public void onSubscribe(Subscription s) {
				s.request(Long.MAX_VALUE);
			}
		});
	}
}

示例二: Consumer

如果在 FlowableSubscriber 中只关心 onNext 方法的话,可以使用 Consumer

import java.io.IOException;

import org.apache.http.client.fluent.Request;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.functions.Consumer;

public class HelloConsumer {
	public static void main(String[] args) {

		Flowable.create(new FlowableOnSubscribe<String>() {

			@Override
			public void subscribe(FlowableEmitter<String> emitter) throws Exception {
				// 模拟无限的上流数据
				for (int i = 1;; ++i) {
					emitter.onNext("http://www.abeffect.com/" + i);
				}
			}
		}, BackpressureStrategy.BUFFER).subscribe(new Consumer<String>() {

			@Override
			public void accept(String s) throws Exception {
				try {
					System.out.println(Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode());
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		});
	}
}

从 subscribe()中,可以看出其可用的方法有:

  • subscribe(Consumer<? super T>)
  • subscribe(Consumer<? super T>, Consumer<? super Throwable>)
  • subscribe(Consumer<? super T>, Consumer<? super Throwable>, Action)
  • subscribe(Consumer<? super T>, Consumer<? super Throwable>, Action, Consumer<? super Subscription>)
  • subscribe(Subscriber<? super T>)
  • subscribe(FlowableSubscriber<? super T>)

可以根据需要,逐步添加 onNext, onError, onComplete, onSubscribe 参数。

同时,可以支持双参数或者多参数类型

  • BiConsumer<T1, T2>: 双参数类型
  • Consumer<Obejct[]>: 多参数类型

Observable 和 Flowable 的对比, 来自[1]

Observable Flowable
元素个数 不超过 1000 个 超过 10+ 个
开销 开销低 开销高

示例三: Single

如果只有一个 onNext 事件时,即唯一事件(流),则可以用 Single。

import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloSingle {
	public static void main(String[] args) {
		Single.create(new SingleOnSubscribe<String>() {

			@Override
			public void subscribe(SingleEmitter<String> singleEmitter) throws Exception {
				singleEmitter.onSuccess("Hello Single");
			}
		}).subscribe(new SingleObserver<String>() {

			@Override
			public void onError(Throwable t) {
			}

			@Override
			public void onSubscribe(Disposable d) {
			}

			@Override
			public void onSuccess(String s) {
				System.out.println(s);
			}
		});
	}
}

结果

Hello Single

BiConsumer

这边也可以直接用 BiConsumer

import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.functions.BiConsumer;

public class HelloSingle2 {
	public static void main(String[] args) {
		Single.create(new SingleOnSubscribe<String>() {

			@Override
			public void subscribe(SingleEmitter<String> singleEmitter) throws Exception {
				singleEmitter.onSuccess("Hello Single");
			}
		}).subscribe(new BiConsumer<String, Throwable>() {

			@Override
			public void accept(String s, Throwable t) throws Exception {
				System.out.println(s);
			}
		});
	}
}

结果

Hello Single

Consumer

import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.functions.Consumer;

public class HelloSingle2 {
	public static void main(String[] args) {
		Single.create(new SingleOnSubscribe<String>() {

			@Override
			public void subscribe(SingleEmitter<String> singleEmitter) throws Exception {
				singleEmitter.onSuccess("Hello Single");
			}
		}).subscribe(new Consumer<String>() {

			@Override
			public void accept(String s) throws Exception {
				System.out.println(s);
			}
		});
	}
}

结果

Hello Single

类型转换

Single 类型也可以转换成其它类型

  • Single.toFlowable()
  • Single.toObservable()
  • Single.toMaybe()

示例四: Completable

如果也不关心 onNext 事件,只关心 onComplete 和 onError,则直接使用 Completable。

import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloCompletable {
	public static void main(String[] args) {
		Completable.create(new CompletableOnSubscribe() {

			@Override
			public void subscribe(CompletableEmitter e) throws Exception {
				e.onComplete();
			}
		}).subscribe(new CompletableObserver() {

			@Override
			public void onSubscribe(Disposable d) {
			}

			@Override
			public void onError(Throwable t) {
			}

			@Override
			public void onComplete() {
				System.out.println("Complete");
			}
		});
	}
}

示例五: Maybe

Single 是发送一个事件, Completable 是不发送任何事件。

如果不知道会不会发送事件呢?即有可能发送一个事件,也有可能不发送任何事件。

那这种情况对应的就是 Maybe。

发送事件时,会响应 onSuccess,不发送事件时,会响应 onComplete,二者只有一个会响应。

import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloMaybe {
	public static void main(String[] args) {
		Maybe.create(new MaybeOnSubscribe<String>() {

			@Override
			public void subscribe(MaybeEmitter<String> e) throws Exception {
				e.onSuccess("Success");
				// 第二条是不会执行的
				e.onComplete();
			}
		}).subscribe(new MaybeObserver<String>() {

			@Override
			public void onComplete() {
				System.out.println("Complete");
			}

			@Override
			public void onError(Throwable t) {
			}

			@Override
			public void onSubscribe(Disposable d) {
			}

			@Override
			public void onSuccess(String s) {
				System.out.println(s);
			}
		});
	}
}

结果

Success

参考

  1. RxJava 2.x 使用详解(一) 快速入门: 此文有一个系列,推荐阅读.
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3190 引用 • 8214 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...