给初学者的 RxJava2.0 教程

本贴最后更新于 3068 天前,其中的信息可能已经沧海桑田

可能很多看过其他人写的文章的朋友都会觉得只有 Flowable 才能解决 Backpressure , 所以大家对这个 Flowable 都抱有很大的期许 , 其实呐 , 你们毕竟图样图森破 , 今天我们先抛开 Flowable, 仅仅依靠我们自己的 双手和智慧 , 来看看我们如何去治理 BackPressure , 通过本节的学习之后我们再来看 Flowable, 你会发现它其实并没有想象中那么牛叉, 它只是被其他人过度神化了.
看代码

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发送事件 emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .filter(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer % 10 == 0; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

在这段代码中我们增加了一个 filter, 只允许能被 10 整除的事件通过, 再来看看运行结果:

filter.gif

可以看到, 虽然内存依然在增长, 但是增长速度相比之前, 已经减少了太多了, 至少在我录完 GIF 之前还没有爆掉内存, 大家可以试着改成能被 100 整除试试.

可以看到, 通过减少进入水缸的事件数量的确可以缓解 BackPressure, 但是力度还不够, 我们再来看一段代码:

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .sample(2, TimeUnit.SECONDS) //sample取样 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

这里用了一个 sample 操作符, 简单做个介绍, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游. 这里我们让它每隔 2 秒取一个事件给下游, 来看看这次的运行结果吧:

sample.gif

这次我们可以看到, 虽然上游仍然一直在不停的发事件, 但是我们只是 每隔一定时间 取一个放进水缸里, 并没有全部放进水缸里, 因此这次内存仅仅只占用了 5M.

大家以后可以出去吹牛逼了: 我曾经通过技术手段去优化一个程序, 最终使得内存占用从 300 多 M 变成不到 5M. ~(≧▽≦)/~啦啦啦

前面这两种方法归根到底其实就是减少放进水缸的事件的数量, 是以 数量 取胜, 但是这个方法有个 缺点, 就是 丢失了大部分的事件.

那么我们换一个角度来思考, 既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从 速度 上取胜, 听上去不错, 我们来试试:

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); Thread.sleep(2000); //每次发送完事件延时2秒 } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

这次我们让上游每次发送完事件后都延时了 2 秒, 来看看运行结果:

sleep.gif

完美 ! 一切都是那么完美 !

可以看到, 我们给上游加上延时了之后, 瞬间一头发情的公牛就变得跟只小绵羊一样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 而且 事件也没有丢失, 上游 通过适当的 延时, 不但 减缓了 事件进入水缸的 速度, 也可以让 下游充足的时间 从水缸里取出事件来处理 , 这样一来, 就不至于导致大量的事件涌进水缸, 也就不会 OOM 啦.

到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了 Backpressure 的问题.

因此我们总结一下, 治理 Backpressure 的办法就两种:

  • 一是从数量上进行治理, 减少发送进水缸里的事件
  • 二是从速度上进行治理, 减缓事件发送进水缸的速度

大家一定没忘记, 在上一节还有个 Zip 的例子, 这个例子也爆了我们的内存, 现学现用, 我们用刚学到的办法来试试能不能惩奸除恶, 先来看看第一种办法.

先来减少进入水缸的事件的数量:

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //进行sample采样 Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("A"); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.w(TAG, throwable); } });

来试试运行结果吧:

zip_sample.gif

哈哈, 成功了吧, 再来用第二种办法试试.

这次我们来减缓速度:

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); Thread.sleep(2000); //发送事件之后延时2秒 } } }).subscribeOn(Schedulers.io()); Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("A"); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.w(TAG, throwable); } });

来看看运行结果吧:

zip_sleep.gif

果然也成功了, 这里只打印出了下游收到的事件, 所以只有一个. 如果你对这个结果看不懂, 请自觉掉头看前面几篇文章.

通过本节的学习, 大家应该对如何处理 Backpressure 已经有了基本的认识了, 大家也可以看到, 我们并没有使用 Flowable, 就轻易的解决了背压的问题, 所以很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法. 后面我们讲到 Flowable 的时候, 大家就会发现它其实没什么神秘的, 它用到的办法和我们本节所讲的基本上是一样的, 只是它稍微做了点封装.

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3201 引用 • 8216 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
cyd
愿你在平凡琐碎的生活里找到意义 深圳

推荐标签 标签

  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖 • 4 关注
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖
  • Sandbox

    如果帖子标签含有 Sandbox ,则该帖子会被视为“测试帖”,主要用于测试社区功能,排查 bug 等,该标签下内容不定期进行清理。

    433 引用 • 1250 回帖 • 595 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 48 关注
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    49 引用 • 192 回帖 • 2 关注
  • Gitea

    Gitea 是一个开源社区驱动的轻量级代码托管解决方案,后端采用 Go 编写,采用 MIT 许可证。

    5 引用 • 16 回帖 • 2 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 30 关注
  • MyBatis

    MyBatis 本是 Apache 软件基金会 的一个开源项目 iBatis,2010 年这个项目由 Apache 软件基金会迁移到了 google code,并且改名为 MyBatis ,2013 年 11 月再次迁移到了 GitHub。

    173 引用 • 414 回帖 • 364 关注
  • CodeMirror
    2 引用 • 17 回帖 • 162 关注
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1708 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    107 引用 • 153 回帖
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    92 引用 • 752 回帖
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 1 关注
  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    6 引用 • 15 回帖 • 10 关注
  • CAP

    CAP 指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得。

    12 引用 • 5 回帖 • 631 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖 • 1 关注
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 609 关注
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    268 引用 • 666 回帖
  • RYMCU

    RYMCU 致力于打造一个即严谨又活泼、专业又不失有趣,为数百万人服务的开源嵌入式知识学习交流平台。

    4 引用 • 6 回帖 • 58 关注
  • Linux

    Linux 是一套免费使用和自由传播的类 Unix 操作系统,是一个基于 POSIX 和 Unix 的多用户、多任务、支持多线程和多 CPU 的操作系统。它能运行主要的 Unix 工具软件、应用程序和网络协议,并支持 32 位和 64 位硬件。Linux 继承了 Unix 以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统。

    952 引用 • 944 回帖
  • 倾城之链
    23 引用 • 66 回帖 • 168 关注
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 499 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 563 关注