背景
响应式编程(Reactive Programming, RP)是一种面对数据流的编程框架。这个框架在 Java 语言上的实现为 RxJava。
特点
RP 解决了什么问题呢?本文中只介绍最基础的部分,一个是在线数据流计算,另一个是 backpressure。
在线数据流计算
如果说 Java 8 中的 Stream 语法是支持离线异步数据流计算的话,那 RxJava 可以支持到在线异步数据流计算。
就是说 RxJava 可以接收无限的源源不断的上游数据流,即在线数据流。而不是只能接受预定义好的有限的一个数据流集合。
backpressure
RxJava 的另一个特点是 backpressure。
backpressure 指在异步场景中,事件产生速度,远远大于事件消费速度时,来告诉 Publisher 慢点产生事件的策略。
观察者模式
RP 是怎么来解决这个问题的呢?
RP 有一个自己的模式,称为观察者模式,如下图所示。
图片来自[3],图中的 Observeable
单词拼写有误,应该为 Observable
。
RxJava 2.x 中,有两种方式:
- 不支持背压:Observable / Observer
- 支持背压:Flowable / Subscriber
例子
理论介绍就这么多,下面开始用实例来介绍吧。
示例一: 最基本用法
第一个例子,来说明 Observable
和 Flowable
怎么用。
步骤
RxJava 开发步骤:
- 初始化 Observable/Flowable
- 初始化 Consumer
- 建立 subscribe 关系
依赖
依赖
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.14</version>
</dependency>
代码
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
public class HelloWorld {
public static void main(String[] args) {
helloObservable();
helloFlowable();
}
private static void helloObservable() {
Observable.just("Hello world Observable!").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
// 更简洁的写法
Observable.just("Hello world, Simple Observable!").subscribe(s -> System.out.println(s));
}
private static void helloFlowable() {
Flowable.just("Hello world Flowable!").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
// 更简洁的写法
Flowable.just("Hello world, Simple Flowable!").subscribe(s -> System.out.println(s));
}
}
结果
Hello world Observable!
Hello world, Simple Observable!
Hello world Flowable!
Hello world, Simple Flowable!
示例二: 高级的数据产生和消费的方法
第二个例子,演示一下 Observable 和 Flowable 中,更高级的数据产生和消费的方法。
- 可以一次发射多个元素
- 可以发射结束事件
- 可以响应结束事件
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class HelloWorld2 {
public static void main(String[] args) {
helloObservable();
helloFlowable();
}
private static void helloObservable() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello Observable 1");
emitter.onNext("Hello Observable 2");
emitter.onNext("Hello Observable 3");
emitter.onNext("Hello Observable 4");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onComplete() {
System.out.println("Complete!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
// 如果只使用onNext的话,可以将Observer换成更简洁的Consumer
System.out.println(s);
}
@Override
public void onSubscribe(Disposable disposable) {
}
});
}
private static void helloFlowable() {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello Flowable 1");
emitter.onNext("Hello Flowable 2");
emitter.onNext("Hello Flowable 3");
emitter.onNext("Hello Flowable 4");
emitter.onComplete();
}
}, BackpressureStrategy.LATEST).subscribe(new FlowableSubscriber<String>() {
@Override
public void onComplete() {
System.out.println("Complete!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onSubscribe(Subscription subscription) {
// 这个参数,下文再讲.
subscription.request(Long.MAX_VALUE);
}
});
}
}
结果
Hello Observable 1
Hello Observable 2
Hello Observable 3
Hello Observable 4
Complete!
Hello Flowable 1
Hello Flowable 2
Hello Flowable 3
Hello Flowable 4
Complete!
示例三: 单线程同步执行
前两个示例只是简单的输出字符串,这个示例演示如何将自定义方法通过 Observable 中来调用。
这里以访问 http 页面为例,来演示自定义方法。
import java.io.IOException;
import org.apache.http.client.fluent.Request;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class HelloRequest {
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("http://www.abeffect.com/1");
emitter.onNext("http://www.abeffect.com/2");
emitter.onNext("http://www.abeffect.com/3");
emitter.onNext("http://www.abeffect.com/4");
}
}).subscribe(new Observer<String>() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
try {
System.out.println(Thread.currentThread() + ": " + s);
System.out.println(Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode());
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onSubscribe(Disposable disposable) {
}
});
}
}
结果
Thread[main,5,main]: http://www.abeffect.com/1
200
Thread[main,5,main]: http://www.abeffect.com/2
200
Thread[main,5,main]: http://www.abeffect.com/3
200
Thread[main,5,main]: http://www.abeffect.com/4
200
可见默认使用了主线程顺序执行。
这里也可以不用 Request,而直接使用 http client 的线程池。原理是一样的,所以本文没有展开写。
示例四: 线程池并行执行
示例三是单线程串行执行的。如何进行并行执行呢?
并行执行有多种实现方式,本示例通过 flatMap
,将原 Observable 的数据拆为多个新的 Observable 来并发执行。
代码中有两个细节:
- 指定了并行执行的外部线程池
- 执行结束后,在
doFinally
方法中手动关闭了线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.http.client.fluent.Request;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Scheduler;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.schedulers.ExecutorScheduler;
public class HelloAsyncRequest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
Scheduler scheduler = new ExecutorScheduler(executor);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("http://www.abeffect.com/1");
emitter.onNext("http://www.abeffect.com/2");
emitter.onNext("http://www.abeffect.com/3");
emitter.onNext("http://www.abeffect.com/4");
emitter.onComplete();
}
}).flatMap(new Function<String, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(String s) throws Exception {
// 通过subscribeOn指定线程池来处理
return Observable.just(s).subscribeOn(scheduler).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode();
}
});
}
}).doFinally(new Action() {
@Override
public void run() throws Exception {
// 所有执行完成后, 关闭线程池.
executor.shutdown();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer statusCode) throws Exception {
System.out.println(Thread.currentThread());
System.out.println(statusCode);
}
});
}
}
示例五:无限的上游数据流
上个示例中处理的数据流还是有限多个,本示例模拟了上游有无限的数据流。
这点即为 RxJava2 的特点之一。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.http.client.fluent.Request;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Scheduler;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.schedulers.ExecutorScheduler;
public class HelloOnlineAsyncRequest {
public static void main(String[] args) {
// 调节成为三个并发
ExecutorService executor = Executors.newFixedThreadPool(3);
Scheduler scheduler = new ExecutorScheduler(executor);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 模拟无限的上流数据
for (int i = 1;; ++i) {
emitter.onNext("http://www.abeffect.com/" + i);
}
}
}).flatMap(new Function<String, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(String s) throws Exception {
// 通过subscribeOn指定线程池来处理
return Observable.just(s).subscribeOn(scheduler).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode();
}
});
}
}).doFinally(new Action() {
@Override
public void run() throws Exception {
// 所有执行完成后, 关闭线程池.
executor.shutdown();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer statusCode) throws Exception {
System.out.println(Thread.currentThread());
System.out.println(statusCode);
}
});
}
}
示例六:延时操作
import java.util.Date;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
public class HelloTimer {
public static void main(String[] args) {
System.out.println("1: " + new Date());
Observable.timer(3, TimeUnit.SECONDS).subscribe(t -> {
System.out.println("2: " + new Date());
Observable.just("Hello world, Simple Observable!").subscribe(s -> System.out.println(s));
System.out.println("3: " + new Date());
});
System.out.println("4: " + new Date());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果
1: Tue Jun 26 10:57:11 CST 2018
4: Tue Jun 26 10:57:11 CST 2018
2: Tue Jun 26 10:57:14 CST 2018
Hello world, Simple Observable!
3: Tue Jun 26 10:57:14 CST 2018
可以看到,延迟 3 秒输出了文字。
示例七: 定时器
import java.util.Date;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
public class HelloTimer {
public static void main(String[] args) {
System.out.println("1: " + new Date());
Observable.intervalRange(0, 10, 3, 2, TimeUnit.SECONDS).subscribe(t -> {
System.out.println("准备: " + new Date());
Observable.just("Hello world, Simple Observable!").subscribe(s -> System.out.println(s));
System.out.println("结束: " + new Date() + "\n");
});
System.out.println("4: " + new Date());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出
1: Tue Jun 26 11:01:10 CST 2018
4: Tue Jun 26 11:01:11 CST 2018
准备: Tue Jun 26 11:01:14 CST 2018
Hello world, Simple Observable!
结束: Tue Jun 26 11:01:14 CST 2018
准备: Tue Jun 26 11:01:16 CST 2018
Hello world, Simple Observable!
结束: Tue Jun 26 11:01:16 CST 2018
准备: Tue Jun 26 11:01:18 CST 2018
Hello world, Simple Observable!
结束: Tue Jun 26 11:01:18 CST 2018
可见,每 2 秒执行一次。
后记
在 Java9 中也已经有了对 RP 的支持,随后将进行专门的学习。
参考
- RxJava@github
- 这可能是最好的 RxJava 2.x 入门教程(一)
- 这可能是最好的 RxJava 2.x 教程(完结版)
- 从零开始的 RxJava 之旅(4)---- RxJava2 总结
- RxJava2 - Zip: 图解什么是 RxJava2, 推荐阅读.
- Reactive Thinking in Java with RxJava2
- RxJava 并行操作: @fengzhizi715 的简书,推荐阅读.
- 从零开始的 RxJava2.0 教程(一)基础
- rxjava2 简单入门用例(一): 更丰富的例子
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于