给初学者的 RxJava2.0 教程

本贴最后更新于 3196 天前,其中的信息可能已经沧海桑田

可能很多看过其他人写的文章的朋友都会觉得只有 Flowable 才能解决 Backpressure , 所以大家对这个 Flowable 都抱有很大的期许 , 其实呐 , 你们毕竟图样图森破 , 今天我们先抛开 Flowable, 仅仅依靠我们自己的 双手和智慧 , 来看看我们如何去治理 BackPressure , 通过本节的学习之后我们再来看 Flowable, 你会发现它其实并没有想象中那么牛叉, 它只是被其他人过度神化了.
看代码

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发送事件 emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .filter(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer % 10 == 0; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

在这段代码中我们增加了一个 filter, 只允许能被 10 整除的事件通过, 再来看看运行结果:

filter.gif

可以看到, 虽然内存依然在增长, 但是增长速度相比之前, 已经减少了太多了, 至少在我录完 GIF 之前还没有爆掉内存, 大家可以试着改成能被 100 整除试试.

可以看到, 通过减少进入水缸的事件数量的确可以缓解 BackPressure, 但是力度还不够, 我们再来看一段代码:

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .sample(2, TimeUnit.SECONDS) //sample取样 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

这里用了一个 sample 操作符, 简单做个介绍, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游. 这里我们让它每隔 2 秒取一个事件给下游, 来看看这次的运行结果吧:

sample.gif

这次我们可以看到, 虽然上游仍然一直在不停的发事件, 但是我们只是 每隔一定时间 取一个放进水缸里, 并没有全部放进水缸里, 因此这次内存仅仅只占用了 5M.

大家以后可以出去吹牛逼了: 我曾经通过技术手段去优化一个程序, 最终使得内存占用从 300 多 M 变成不到 5M. ~(≧▽≦)/~啦啦啦

前面这两种方法归根到底其实就是减少放进水缸的事件的数量, 是以 数量 取胜, 但是这个方法有个 缺点, 就是 丢失了大部分的事件.

那么我们换一个角度来思考, 既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从 速度 上取胜, 听上去不错, 我们来试试:

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); Thread.sleep(2000); //每次发送完事件延时2秒 } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

这次我们让上游每次发送完事件后都延时了 2 秒, 来看看运行结果:

sleep.gif

完美 ! 一切都是那么完美 !

可以看到, 我们给上游加上延时了之后, 瞬间一头发情的公牛就变得跟只小绵羊一样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 而且 事件也没有丢失, 上游 通过适当的 延时, 不但 减缓了 事件进入水缸的 速度, 也可以让 下游充足的时间 从水缸里取出事件来处理 , 这样一来, 就不至于导致大量的事件涌进水缸, 也就不会 OOM 啦.

到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了 Backpressure 的问题.

因此我们总结一下, 治理 Backpressure 的办法就两种:

  • 一是从数量上进行治理, 减少发送进水缸里的事件
  • 二是从速度上进行治理, 减缓事件发送进水缸的速度

大家一定没忘记, 在上一节还有个 Zip 的例子, 这个例子也爆了我们的内存, 现学现用, 我们用刚学到的办法来试试能不能惩奸除恶, 先来看看第一种办法.

先来减少进入水缸的事件的数量:

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //进行sample采样 Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("A"); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.w(TAG, throwable); } });

来试试运行结果吧:

zip_sample.gif

哈哈, 成功了吧, 再来用第二种办法试试.

这次我们来减缓速度:

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); Thread.sleep(2000); //发送事件之后延时2秒 } } }).subscribeOn(Schedulers.io()); Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("A"); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.w(TAG, throwable); } });

来看看运行结果吧:

zip_sleep.gif

果然也成功了, 这里只打印出了下游收到的事件, 所以只有一个. 如果你对这个结果看不懂, 请自觉掉头看前面几篇文章.

通过本节的学习, 大家应该对如何处理 Backpressure 已经有了基本的认识了, 大家也可以看到, 我们并没有使用 Flowable, 就轻易的解决了背压的问题, 所以很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法. 后面我们讲到 Flowable 的时候, 大家就会发现它其实没什么神秘的, 它用到的办法和我们本节所讲的基本上是一样的, 只是它稍微做了点封装.

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3203 引用 • 8217 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
cyd
愿你在平凡琐碎的生活里找到意义 深圳

推荐标签 标签

  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    108 引用 • 153 回帖 • 2 关注
  • Sillot

    Insights(注意当前设置 master 为默认分支)

    汐洛彖夲肜矩阵(Sillot T☳Converbenk Matrix),致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点。其中汐洛绞架(Sillot-Gibbet)基于自思源笔记(siyuan-note),前身是思源笔记汐洛版(更早是思源笔记汐洛分支),是智慧新录乄终端(多端融合,移动端优先)。

    主仓库地址:Hi-Windom/Sillot

    文档地址:sillot.db.sc.cn

    注意事项:

    1. ⚠️ 汐洛仍在早期开发阶段,尚不稳定
    2. ⚠️ 汐洛并非面向普通用户设计,使用前请了解风险
    3. ⚠️ 汐洛绞架基于思源笔记,开发者尽最大努力与思源笔记保持兼容,但无法实现 100% 兼容
    29 引用 • 25 回帖 • 146 关注
  • Vditor

    Vditor 是一款浏览器端的 Markdown 编辑器,支持所见即所得、即时渲染(类似 Typora)和分屏预览模式。它使用 TypeScript 实现,支持原生 JavaScript、Vue、React 和 Angular。

    381 引用 • 1874 回帖 • 3 关注
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    35 引用 • 468 回帖 • 758 关注
  • 智能合约

    智能合约(Smart contract)是一种旨在以信息化方式传播、验证或执行合同的计算机协议。智能合约允许在没有第三方的情况下进行可信交易,这些交易可追踪且不可逆转。智能合约概念于 1994 年由 Nick Szabo 首次提出。

    1 引用 • 11 回帖 • 2 关注
  • RYMCU

    RYMCU 致力于打造一个即严谨又活泼、专业又不失有趣,为数百万人服务的开源嵌入式知识学习交流平台。

    4 引用 • 6 回帖 • 65 关注
  • webpack

    webpack 是一个用于前端开发的模块加载器和打包工具,它能把各种资源,例如 JS、CSS(less/sass)、图片等都作为模块来使用和处理。

    42 引用 • 130 回帖 • 252 关注
  • RESTful

    一种软件架构设计风格而不是标准,提供了一组设计原则和约束条件,主要用于客户端和服务器交互类的软件。基于这个风格设计的软件可以更简洁,更有层次,更易于实现缓存等机制。

    30 引用 • 114 回帖 • 7 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3203 引用 • 8217 回帖
  • OnlyOffice
    4 引用 • 26 关注
  • Ngui

    Ngui 是一个 GUI 的排版显示引擎和跨平台的 GUI 应用程序开发框架,基于
    Node.js / OpenGL。目标是在此基础上开发 GUI 应用程序可拥有开发 WEB 应用般简单与速度同时兼顾 Native 应用程序的性能与体验。

    7 引用 • 9 回帖 • 415 关注
  • 链滴

    链滴是一个记录生活的地方。

    记录生活,连接点滴

    192 引用 • 3959 回帖 • 1 关注
  • 支付宝

    支付宝是全球领先的独立第三方支付平台,致力于为广大用户提供安全快速的电子支付/网上支付/安全支付/手机支付体验,及转账收款/水电煤缴费/信用卡还款/AA 收款等生活服务应用。

    29 引用 • 347 回帖
  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖 • 2 关注
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 59 回帖
  • WordPress

    WordPress 是一个使用 PHP 语言开发的博客平台,用户可以在支持 PHP 和 MySQL 数据库的服务器上架设自己的博客。也可以把 WordPress 当作一个内容管理系统(CMS)来使用。WordPress 是一个免费的开源项目,在 GNU 通用公共许可证(GPLv2)下授权发布。

    46 引用 • 114 回帖 • 141 关注
  • OpenResty

    OpenResty 是一个基于 NGINX 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。

    17 引用 • 46 关注
  • 房星科技

    房星网,我们不和没有钱的程序员谈理想,我们要让程序员又有理想又有钱。我们有雄厚的房地产行业线下资源,遍布昆明全城的 100 家门店、四千地产经纪人是我们坚实的后盾。

    6 引用 • 141 回帖 • 618 关注
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 517 关注
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    16 引用 • 143 回帖 • 1 关注
  • 工具

    子曰:“工欲善其事,必先利其器。”

    303 引用 • 772 回帖
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 536 关注
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 453 关注
  • gRpc
    11 引用 • 9 回帖 • 102 关注
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖
  • Log4j

    Log4j 是 Apache 开源的一款使用广泛的 Java 日志组件。

    20 引用 • 18 回帖 • 52 关注