RxJava2 中的 Backpressure

本贴最后更新于 2385 天前,其中的信息可能已经时异事殊

背景

反压(背压, Backpressure)是什么呢?

被观察者生产事件的速度太快,超过了观察者处理事件的速度时,这里系统就会变的不稳定。

如果不做处理,事件会堆积到内存中,最终导致 OOM。

反压提供了一种方式,来制定这种情况发生时,可以采用的策略。

原理

响应式拉取(reactive pull)

在 Observable 中,是被观察者生产数据后,主动将数据推向观察者,观察者来消费。

在 Flowable 中,是观察者将数据消费掉后,主动从被观察者那边来拉取数据,被观察者等待通知再发送数据。

具体在代码中,是通过 Subscription.request(number) 来拉取指定个数的数据的。

如果不想用反压策略,可以设置 Subscription.request(LONG.MAX_VALUE)

异步线程

观察者线程和被观察者线程是不同的线程。在同一样线程中,被观察者需要等待观察者将事件处理完毕后才会继续发送下面的事件。

示例

Observable 无反压

import java.util.Date;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class HelloObservable {
	public static void main(String[] args) throws InterruptedException {
		Observable.create(new ObservableOnSubscribe<Long>() {

			@Override
			public void subscribe(ObservableEmitter<Long> e) throws Exception {
				for (long i = 0;; ++i) {
					e.onNext(i);
				}
			}
		}).observeOn(Schedulers.newThread()).subscribe(new Observer<Long>() {

			@Override
			public void onSubscribe(Disposable d) {

			}

			@Override
			public void onNext(Long t) {
				System.out.println((new Date()) + ": " + t);
				try {
					Thread.sleep(1000L);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

			@Override
			public void onError(Throwable e) {
				e.printStackTrace();
			}

			@Override
			public void onComplete() {
				System.out.println("Complete");
			}
		});

		Thread.sleep(10000L);
	}
}

结果

Wed Jun 27 10:08:41 CST 2018: 0
Wed Jun 27 10:08:42 CST 2018: 1
Wed Jun 27 10:08:44 CST 2018: 2
Wed Jun 27 10:08:48 CST 2018: 3
Wed Jun 27 10:08:49 CST 2018: 4
Wed Jun 27 10:08:55 CST 2018: 5
Wed Jun 27 10:08:56 CST 2018: 6
Wed Jun 27 10:08:59 CST 2018: 7

如果正常的话,应该每 1 秒输出一个值。但是由于 GC 的影响,输出时间在逐渐增加。

Flowable 有反压

Flowable 支持反压,有五种策略

策略 原文描述 备注
BUFFER Buffers all onNext values until the downstream consumes it. 缓存所有的值
DROP Drops the most recent onNext value if the downstream can't keep up 丢弃最新的值
ERROR Signals a MissingBackpressureException in case the downstream can't keep up 触发异常
MISSING OnNext events are written without any buffering or dropping. Downstream has to deal with any overflow. Useful when one applies one of the custom-parameter onBackpressureXXX operators. 不丢弃, 不缓存, 触发异常
LATEST Keeps only the latest onNext value, overwriting any previous value if the downstream can't keep up.

Drop 和 Latest 都不会 Buffer 数据,详见参考中的[4]。

示例: Flowable Buffer 策略

使用的通用的测试模板,可以测试多种策略,需要注意的点是:

  1. 策略对应于模板中的 BackpressureStrategy.BUFFER,调整策略即调整这块儿的参数。
  2. 通过 observeOn(Schedulers.newThread()) 来保证异步。如果没有这段的话,会在同一个线程内运行。

下面的示例,先快速生产 3 秒的数据,然后停止生产数据;消费时头 3 秒先缓慢消费,然后快速消费完剩余的数据。

import java.util.Date;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.schedulers.Schedulers;

public class HelloFlowableBuffer {
	public static void main(String[] args) throws InterruptedException {

		Flowable.create(new FlowableOnSubscribe<Long>() {

			@Override
			public void subscribe(FlowableEmitter<Long> e) throws Exception {
				// 仅持续产生3秒数据
				long begin = System.currentTimeMillis();
				for (long i = 0;; ++i) {
					e.onNext(i);
					// 到3秒,则退出.
					if (System.currentTimeMillis() - begin > 3000L) {
						break;
					}
				}
			}
		}, BackpressureStrategy.BUFFER).observeOn(Schedulers.newThread()).subscribe(new Subscriber<Long>() {

			private Subscription s;

			private long begin = System.currentTimeMillis();

			@Override
			public void onComplete() {
				System.out.println("Complete");
			}

			@Override
			public void onError(Throwable t) {
				t.printStackTrace();
			}

			@Override
			public void onNext(Long l) {
				System.out.println((new Date()) + ": " + l);

				// 前3秒时有停留, 之后不停留.
				if (System.currentTimeMillis() - begin < 3000) {
					try {
						Thread.sleep(1000L);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				this.s.request(1);
			}

			@Override
			public void onSubscribe(Subscription s) {
				this.s = s;
				s.request(1);
			}
		});

		Thread.sleep(5000L);
	}
}

Buffer 策略就是都保存在内存中,其实和上面的情况是一样的。

结果

Wed Jun 27 10:38:40 CST 2018: 0
Wed Jun 27 10:38:41 CST 2018: 1
Wed Jun 27 10:38:44 CST 2018: 2
............
Wed Jun 27 10:38:49 CST 2018: 567521
Wed Jun 27 10:38:49 CST 2018: 567522
Wed Jun 27 10:38:49 CST 2018: 567523

示例: Flowable Drop 策略

使用策略: BackpressureStrategy.DROP

结果

Wed Jun 27 10:50:04 CST 2018: 0
Wed Jun 27 10:50:06 CST 2018: 1
Wed Jun 27 10:50:07 CST 2018: 2
............
Wed Jun 27 10:50:08 CST 2018: 125
Wed Jun 27 10:50:08 CST 2018: 126
Wed Jun 27 10:50:08 CST 2018: 127

示例: Flowable Latest 策略

使用策略: BackpressureStrategy.LATEST

结果

Wed Jun 27 10:58:08 CST 2018: 0
Wed Jun 27 10:58:09 CST 2018: 1
Wed Jun 27 10:58:10 CST 2018: 2
............
Wed Jun 27 10:58:11 CST 2018: 126
Wed Jun 27 10:58:11 CST 2018: 127
Wed Jun 27 10:58:11 CST 2018: 50155889

可见 LATEST 策略的处理方式为只保留最新的一条数据。

示例: Flowable Error 策略

使用策略: BackpressureStrategy.ERROR

结果

Wed Jun 27 11:01:49 CST 2018: 0
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
	at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:442)
	at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:408)
	at backpressure.HelloFlowableError$1.subscribe(HelloFlowableError.java:24)
	at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
	at io.reactivex.Flowable.subscribe(Flowable.java:14349)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn.subscribeActual(FlowableObserveOn.java:56)
	at io.reactivex.Flowable.subscribe(Flowable.java:14349)
	at io.reactivex.Flowable.subscribe(Flowable.java:14298)
	at backpressure.HelloFlowableError.main(HelloFlowableError.java:31)

示例: Flowable Missing 策略

使用策略: BackpressureStrategy.MISSING

结果

Wed Jun 27 11:02:53 CST 2018: 0
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:114)
	at io.reactivex.internal.operators.flowable.FlowableCreate$MissingEmitter.onNext(FlowableCreate.java:369)
	at backpressure.HelloFlowableMissing$1.subscribe(HelloFlowableMissing.java:24)
	at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
	at io.reactivex.Flowable.subscribe(Flowable.java:14349)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn.subscribeActual(FlowableObserveOn.java:56)
	at io.reactivex.Flowable.subscribe(Flowable.java:14349)
	at io.reactivex.Flowable.subscribe(Flowable.java:14298)
	at backpressure.HelloFlowableMissing.main(HelloFlowableMissing.java:31)

附录

调度器类型

调度器类型有

调度器类型 效果
computation 计算任务
io IO 任务
newThread 新启动一个线程
from(executor) 指定线程池
single 单线程串行执行
trampoline 放入当前线程队列

自由切换调度器示例

import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class HelloThread {
	public static void main(String[] args) throws InterruptedException {

		Flowable.range(1, 3).observeOn(Schedulers.computation()) // 下面的computation线程
				.map(new Function<Integer, String>() {

					@Override
					public String apply(Integer t) throws Exception {
						System.out.println("map 1: " + Thread.currentThread());
						return String.valueOf(t);
					}
				}).observeOn(Schedulers.io()) // 下面的是io线程
				.map(new Function<String, Integer>() {

					@Override
					public Integer apply(String t) throws Exception {
						System.out.println("map 2: " + Thread.currentThread());
						return Integer.valueOf(t);
					}
				}).subscribeOn(Schedulers.single()) // 下面的在严格的单线程串行
				.subscribe(new Consumer<Integer>() {
					@Override
					public void accept(Integer t) throws Exception {
						System.out.println("final: " + Thread.currentThread() + ", t");
					}
				});

		Thread.sleep(3000L);
	}
}

结果

map 1: Thread[RxComputationThreadPool-1,5,main]
map 1: Thread[RxComputationThreadPool-1,5,main]
map 1: Thread[RxComputationThreadPool-1,5,main]
map 2: Thread[RxCachedThreadScheduler-1,5,main]
final: Thread[RxCachedThreadScheduler-1,5,main], t
map 2: Thread[RxCachedThreadScheduler-1,5,main]
final: Thread[RxCachedThreadScheduler-1,5,main], t
map 2: Thread[RxCachedThreadScheduler-1,5,main]
final: Thread[RxCachedThreadScheduler-1,5,main], t

减少数据量的方法

采样: Sample 算子

使用 Sample 算子,可以采样数据

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.schedulers.Schedulers;

public class HelloFlowableBufferSample {
	public static void main(String[] args) throws InterruptedException {

		Flowable.create(new FlowableOnSubscribe<Long>() {

			@Override
			public void subscribe(FlowableEmitter<Long> e) throws Exception {
				// 仅持续产生3秒数据
				long begin = System.currentTimeMillis();
				for (long i = 0;; ++i) {
					e.onNext(i);
					// 到3秒,则退出.
					if (System.currentTimeMillis() - begin > 3000L) {
						break;
					}
				}
			}
		}, BackpressureStrategy.BUFFER).sample(1, TimeUnit.SECONDS) // 采样: 每秒一条数据
				.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Long>() {

					private Subscription s;

					private long begin = System.currentTimeMillis();

					@Override
					public void onComplete() {
						System.out.println("Complete");
					}

					@Override
					public void onError(Throwable t) {
						t.printStackTrace();
					}

					@Override
					public void onNext(Long l) {
						System.out.println((new Date()) + ": " + l);

						// 前3秒时有停留, 之后不停留.
						if (System.currentTimeMillis() - begin < 3000) {
							try {
								Thread.sleep(1000L);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
						this.s.request(1);
					}

					@Override
					public void onSubscribe(Subscription s) {
						this.s = s;
						s.request(1);
					}
				});

		Thread.sleep(5000L);
	}
}

结果

Wed Jun 27 15:13:10 CST 2018: 14425216
Wed Jun 27 15:13:11 CST 2018: 29448431
Wed Jun 27 15:13:12 CST 2018: 44742890

可见每秒采样一条数据。

过滤: 过滤事件

import java.util.Date;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.schedulers.Schedulers;

public class HelloFlowableBufferFilter {
	public static void main(String[] args) throws InterruptedException {

		Flowable.create(new FlowableOnSubscribe<Long>() {

			@Override
			public void subscribe(FlowableEmitter<Long> e) throws Exception {
				// 仅持续产生3秒数据
				long begin = System.currentTimeMillis();
				for (long i = 0;; ++i) {
					e.onNext(i);
					// 到3秒,则退出.
					if (System.currentTimeMillis() - begin > 3000L) {
						break;
					}
				}
			}
		}, BackpressureStrategy.BUFFER).filter(t -> t % 5000000 == 0) // 过滤: 每500万条取1条
				.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Long>() {

					private Subscription s;

					private long begin = System.currentTimeMillis();

					@Override
					public void onComplete() {
						System.out.println("Complete");
					}

					@Override
					public void onError(Throwable t) {
						t.printStackTrace();
					}

					@Override
					public void onNext(Long l) {
						System.out.println((new Date()) + ": " + l);

						// 前3秒时有停留, 之后不停留.
						if (System.currentTimeMillis() - begin < 3000) {
							try {
								Thread.sleep(1000L);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
						this.s.request(1);
					}

					@Override
					public void onSubscribe(Subscription s) {
						this.s = s;
						s.request(1);
					}
				});

		Thread.sleep(5000L);
	}
}

结果

Wed Jun 27 15:18:28 CST 2018: 0
Wed Jun 27 15:18:29 CST 2018: 5000000
Wed Jun 27 15:18:30 CST 2018: 10000000
Wed Jun 27 15:18:31 CST 2018: 15000000
Wed Jun 27 15:18:31 CST 2018: 20000000
Wed Jun 27 15:18:31 CST 2018: 25000000

ParallelFlowable

ParallelFlowable 提供了一种并行执行的方式.

import io.reactivex.Flowable;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.schedulers.Schedulers;

public class HelloParallelFlowableBufferSample {
	public static void main(String[] args) throws InterruptedException {

		ParallelFlowable.from(Flowable.range(1, 10), 4) // 并行度为4
				.runOn(Schedulers.computation()) // 运行在计算线程池上
				.map(i -> i * 2) // 简单map
				.sequential() // 将结果merge
				.subscribe(System.out::println); // 输出到控制台

		Thread.sleep(3000L);
	}
}

结果

2
4
6
8
12
14
16
20
10
18

参考

  1. 关于 RxJava 最友好的文章——背压(Backpressure)
  2. 关于 RxJava 最友好的文章(初级篇)
  3. 关于 RxJava 背压
  4. Reactive Streams and RxJava2
  5. RxJava 中 backpressure 这个概念的理解
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3190 引用 • 8214 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...