RxJava2 入门和基本使用

本贴最后更新于 2343 天前,其中的信息可能已经时移世异

背景

响应式编程(Reactive Programming, RP)是一种面对数据流的编程框架。这个框架在 Java 语言上的实现为 RxJava。

特点

RP 解决了什么问题呢?本文中只介绍最基础的部分,一个是在线数据流计算,另一个是 backpressure。

在线数据流计算

如果说 Java 8 中的 Stream 语法是支持离线异步数据流计算的话,那 RxJava 可以支持到在线异步数据流计算。

就是说 RxJava 可以接收无限的源源不断的上游数据流,即在线数据流。而不是只能接受预定义好的有限的一个数据流集合。

backpressure

RxJava 的另一个特点是 backpressure。

backpressure 指在异步场景中,事件产生速度,远远大于事件消费速度时,来告诉 Publisher 慢点产生事件的策略。

观察者模式

RP 是怎么来解决这个问题的呢?

RP 有一个自己的模式,称为观察者模式,如下图所示。

imagepng

图片来自[3],图中的 Observeable 单词拼写有误,应该为 Observable

RxJava 2.x 中,有两种方式:

  • 不支持背压:Observable / Observer
  • 支持背压:Flowable / Subscriber

例子

理论介绍就这么多,下面开始用实例来介绍吧。

示例一: 最基本用法

第一个例子,来说明 ObservableFlowable 怎么用。

步骤

RxJava 开发步骤:

  1. 初始化 Observable/Flowable
  2. 初始化 Consumer
  3. 建立 subscribe 关系

依赖

依赖

		<dependency>
			<groupId>io.reactivex.rxjava2</groupId>
			<artifactId>rxjava</artifactId>
			<version>2.1.14</version>
		</dependency>

代码

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;

public class HelloWorld {
	public static void main(String[] args) {
		helloObservable();
		helloFlowable();
	}

	private static void helloObservable() {

		Observable.just("Hello world Observable!").subscribe(new Consumer<String>() {

			@Override
			public void accept(String s) throws Exception {
				System.out.println(s);
			}
		});

		// 更简洁的写法
		Observable.just("Hello world, Simple Observable!").subscribe(s -> System.out.println(s));

	}

	private static void helloFlowable() {
		Flowable.just("Hello world Flowable!").subscribe(new Consumer<String>() {

			@Override
			public void accept(String s) throws Exception {
				System.out.println(s);
			}
		});

		// 更简洁的写法
		Flowable.just("Hello world, Simple Flowable!").subscribe(s -> System.out.println(s));
	}
}

结果

Hello world Observable!
Hello world, Simple Observable!
Hello world Flowable!
Hello world, Simple Flowable!

示例二: 高级的数据产生和消费的方法

第二个例子,演示一下 Observable 和 Flowable 中,更高级的数据产生和消费的方法。

  • 可以一次发射多个元素
  • 可以发射结束事件
  • 可以响应结束事件
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class HelloWorld2 {
	public static void main(String[] args) {
		helloObservable();
		helloFlowable();
	}

	private static void helloObservable() {

		Observable.create(new ObservableOnSubscribe<String>() {

			@Override
			public void subscribe(ObservableEmitter<String> emitter) throws Exception {
				emitter.onNext("Hello Observable 1");
				emitter.onNext("Hello Observable 2");
				emitter.onNext("Hello Observable 3");
				emitter.onNext("Hello Observable 4");
				emitter.onComplete();
			}
		}).subscribe(new Observer<String>() {

			@Override
			public void onComplete() {
				System.out.println("Complete!");
			}

			@Override
			public void onError(Throwable e) {
				e.printStackTrace();
			}

			@Override
			public void onNext(String s) {
				// 如果只使用onNext的话,可以将Observer换成更简洁的Consumer
				System.out.println(s);
			}

			@Override
			public void onSubscribe(Disposable disposable) {
			}
		});
	}

	private static void helloFlowable() {
		Flowable.create(new FlowableOnSubscribe<String>() {

			@Override
			public void subscribe(FlowableEmitter<String> emitter) throws Exception {
				emitter.onNext("Hello Flowable 1");
				emitter.onNext("Hello Flowable 2");
				emitter.onNext("Hello Flowable 3");
				emitter.onNext("Hello Flowable 4");
				emitter.onComplete();
			}
		}, BackpressureStrategy.LATEST).subscribe(new FlowableSubscriber<String>() {

			@Override
			public void onComplete() {
				System.out.println("Complete!");
			}

			@Override
			public void onError(Throwable e) {
				e.printStackTrace();
			}

			@Override
			public void onNext(String s) {
				System.out.println(s);
			}

			@Override
			public void onSubscribe(Subscription subscription) {
				// 这个参数,下文再讲. 
				subscription.request(Long.MAX_VALUE);
			}
		});
	}
}

结果

Hello Observable 1
Hello Observable 2
Hello Observable 3
Hello Observable 4
Complete!
Hello Flowable 1
Hello Flowable 2
Hello Flowable 3
Hello Flowable 4
Complete!

示例三: 单线程同步执行

前两个示例只是简单的输出字符串,这个示例演示如何将自定义方法通过 Observable 中来调用。

这里以访问 http 页面为例,来演示自定义方法。

import java.io.IOException;

import org.apache.http.client.fluent.Request;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class HelloRequest {
	public static void main(String[] args) {

		Observable.create(new ObservableOnSubscribe<String>() {

			@Override
			public void subscribe(ObservableEmitter<String> emitter) throws Exception {
				emitter.onNext("http://www.abeffect.com/1");
				emitter.onNext("http://www.abeffect.com/2");
				emitter.onNext("http://www.abeffect.com/3");
				emitter.onNext("http://www.abeffect.com/4");
			}
		}).subscribe(new Observer<String>() {

			@Override
			public void onComplete() {
			}

			@Override
			public void onError(Throwable e) {
				e.printStackTrace();
			}

			@Override
			public void onNext(String s) {
				try {
					System.out.println(Thread.currentThread() + ": " + s);
					System.out.println(Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode());
				} catch (IOException e) {
					e.printStackTrace();
				}
				try {
					Thread.sleep(5000L);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

			@Override
			public void onSubscribe(Disposable disposable) {
			}
		});
	}
}

结果

Thread[main,5,main]: http://www.abeffect.com/1
200
Thread[main,5,main]: http://www.abeffect.com/2
200
Thread[main,5,main]: http://www.abeffect.com/3
200
Thread[main,5,main]: http://www.abeffect.com/4
200

可见默认使用了主线程顺序执行。

这里也可以不用 Request,而直接使用 http client 的线程池。原理是一样的,所以本文没有展开写。

示例四: 线程池并行执行

示例三是单线程串行执行的。如何进行并行执行呢?

并行执行有多种实现方式,本示例通过 flatMap,将原 Observable 的数据拆为多个新的 Observable 来并发执行。

代码中有两个细节:

  1. 指定了并行执行的外部线程池
  2. 执行结束后,在 doFinally 方法中手动关闭了线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.http.client.fluent.Request;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Scheduler;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.schedulers.ExecutorScheduler;

public class HelloAsyncRequest {
	public static void main(String[] args) {

		ExecutorService executor = Executors.newFixedThreadPool(10);
		Scheduler scheduler = new ExecutorScheduler(executor);

		Observable.create(new ObservableOnSubscribe<String>() {

			@Override
			public void subscribe(ObservableEmitter<String> emitter) throws Exception {
				emitter.onNext("http://www.abeffect.com/1");
				emitter.onNext("http://www.abeffect.com/2");
				emitter.onNext("http://www.abeffect.com/3");
				emitter.onNext("http://www.abeffect.com/4");
				emitter.onComplete();
			}
		}).flatMap(new Function<String, ObservableSource<Integer>>() {

			@Override
			public ObservableSource<Integer> apply(String s) throws Exception {
				// 通过subscribeOn指定线程池来处理
				return Observable.just(s).subscribeOn(scheduler).map(new Function<String, Integer>() {

					@Override
					public Integer apply(String s) throws Exception {
						return Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode();
					}
				});
			}
		}).doFinally(new Action() {

			@Override
			public void run() throws Exception {
				// 所有执行完成后, 关闭线程池.
				executor.shutdown();
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer statusCode) throws Exception {
				System.out.println(Thread.currentThread());
				System.out.println(statusCode);
			}
		});
	}
}

示例五:无限的上游数据流

上个示例中处理的数据流还是有限多个,本示例模拟了上游有无限的数据流。

这点即为 RxJava2 的特点之一。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.http.client.fluent.Request;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Scheduler;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.schedulers.ExecutorScheduler;

public class HelloOnlineAsyncRequest {
	public static void main(String[] args) {

		// 调节成为三个并发
		ExecutorService executor = Executors.newFixedThreadPool(3);
		Scheduler scheduler = new ExecutorScheduler(executor);

		Observable.create(new ObservableOnSubscribe<String>() {

			@Override
			public void subscribe(ObservableEmitter<String> emitter) throws Exception {
				// 模拟无限的上流数据
				for (int i = 1;; ++i) {
					emitter.onNext("http://www.abeffect.com/" + i);
				}
			}
		}).flatMap(new Function<String, ObservableSource<Integer>>() {

			@Override
			public ObservableSource<Integer> apply(String s) throws Exception {
				// 通过subscribeOn指定线程池来处理
				return Observable.just(s).subscribeOn(scheduler).map(new Function<String, Integer>() {

					@Override
					public Integer apply(String s) throws Exception {
						return Request.Get(s).execute().returnResponse().getStatusLine().getStatusCode();
					}
				});
			}
		}).doFinally(new Action() {

			@Override
			public void run() throws Exception {
				// 所有执行完成后, 关闭线程池.
				executor.shutdown();
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer statusCode) throws Exception {
				System.out.println(Thread.currentThread());
				System.out.println(statusCode);
			}
		});
	}
}

示例六:延时操作

import java.util.Date;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;

public class HelloTimer {
	public static void main(String[] args) {

		System.out.println("1: " + new Date());
		Observable.timer(3, TimeUnit.SECONDS).subscribe(t -> {
			System.out.println("2: " + new Date());
			Observable.just("Hello world, Simple Observable!").subscribe(s -> System.out.println(s));
			System.out.println("3: " + new Date());
		});
		System.out.println("4: " + new Date());

		try {
			Thread.sleep(10000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

结果

1: Tue Jun 26 10:57:11 CST 2018
4: Tue Jun 26 10:57:11 CST 2018
2: Tue Jun 26 10:57:14 CST 2018
Hello world, Simple Observable!
3: Tue Jun 26 10:57:14 CST 2018

可以看到,延迟 3 秒输出了文字。

示例七: 定时器

import java.util.Date;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;

public class HelloTimer {
	public static void main(String[] args) {

		System.out.println("1: " + new Date());

		Observable.intervalRange(0, 10, 3, 2, TimeUnit.SECONDS).subscribe(t -> {
			System.out.println("准备: " + new Date());
			Observable.just("Hello world, Simple Observable!").subscribe(s -> System.out.println(s));
			System.out.println("结束: " + new Date() + "\n");
		});
		System.out.println("4: " + new Date());

		try {
			Thread.sleep(10000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

输出

1: Tue Jun 26 11:01:10 CST 2018
4: Tue Jun 26 11:01:11 CST 2018
准备: Tue Jun 26 11:01:14 CST 2018
Hello world, Simple Observable!
结束: Tue Jun 26 11:01:14 CST 2018

准备: Tue Jun 26 11:01:16 CST 2018
Hello world, Simple Observable!
结束: Tue Jun 26 11:01:16 CST 2018

准备: Tue Jun 26 11:01:18 CST 2018
Hello world, Simple Observable!
结束: Tue Jun 26 11:01:18 CST 2018

可见,每 2 秒执行一次。

后记

在 Java9 中也已经有了对 RP 的支持,随后将进行专门的学习。

参考

  1. RxJava@github
  2. 这可能是最好的 RxJava 2.x 入门教程(一)
  3. 这可能是最好的 RxJava 2.x 教程(完结版)
  4. 从零开始的 RxJava 之旅(4)---- RxJava2 总结
  5. RxJava2 - Zip: 图解什么是 RxJava2, 推荐阅读.
  6. Reactive Thinking in Java with RxJava2
  7. RxJava 并行操作: @fengzhizi715 的简书,推荐阅读.
  8. 从零开始的 RxJava2.0 教程(一)基础
  9. rxjava2 简单入门用例(一): 更丰富的例子
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...