给初学者的 RxJava2.0 教程

本贴最后更新于 2894 天前,其中的信息可能已经沧海桑田

可能很多看过其他人写的文章的朋友都会觉得只有 Flowable 才能解决 Backpressure , 所以大家对这个 Flowable 都抱有很大的期许 , 其实呐 , 你们毕竟图样图森破 , 今天我们先抛开 Flowable, 仅仅依靠我们自己的 双手和智慧 , 来看看我们如何去治理 BackPressure , 通过本节的学习之后我们再来看 Flowable, 你会发现它其实并没有想象中那么牛叉, 它只是被其他人过度神化了.
看代码

Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                for (int i = 0; ; i++) {  //无限循环发送事件
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });
Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .filter(new Predicate() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer % 10 == 0;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

在这段代码中我们增加了一个 filter, 只允许能被 10 整除的事件通过, 再来看看运行结果:

filter.gif

可以看到, 虽然内存依然在增长, 但是增长速度相比之前, 已经减少了太多了, 至少在我录完 GIF 之前还没有爆掉内存, 大家可以试着改成能被 100 整除试试.

可以看到, 通过减少进入水缸的事件数量的确可以缓解 BackPressure, 但是力度还不够, 我们再来看一段代码:

Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .sample(2, TimeUnit.SECONDS)  //sample取样
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

这里用了一个 sample 操作符, 简单做个介绍, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游. 这里我们让它每隔 2 秒取一个事件给下游, 来看看这次的运行结果吧:

sample.gif

这次我们可以看到, 虽然上游仍然一直在不停的发事件, 但是我们只是 每隔一定时间 取一个放进水缸里, 并没有全部放进水缸里, 因此这次内存仅仅只占用了 5M.

大家以后可以出去吹牛逼了: 我曾经通过技术手段去优化一个程序, 最终使得内存占用从 300 多 M 变成不到 5M. ~(≧▽≦)/~啦啦啦

前面这两种方法归根到底其实就是减少放进水缸的事件的数量, 是以 数量 取胜, 但是这个方法有个 缺点, 就是 丢失了大部分的事件.

那么我们换一个角度来思考, 既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从 速度 上取胜, 听上去不错, 我们来试试:

Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //每次发送完事件延时2秒
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

这次我们让上游每次发送完事件后都延时了 2 秒, 来看看运行结果:

sleep.gif

完美 ! 一切都是那么完美 !

可以看到, 我们给上游加上延时了之后, 瞬间一头发情的公牛就变得跟只小绵羊一样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 而且 事件也没有丢失, 上游 通过适当的 延时, 不但 减缓了 事件进入水缸的 速度, 也可以让 下游充足的时间 从水缸里取出事件来处理 , 这样一来, 就不至于导致大量的事件涌进水缸, 也就不会 OOM 啦.

到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了 Backpressure 的问题.

因此我们总结一下, 治理 Backpressure 的办法就两种:

  • 一是从数量上进行治理, 减少发送进水缸里的事件
  • 二是从速度上进行治理, 减缓事件发送进水缸的速度

大家一定没忘记, 在上一节还有个 Zip 的例子, 这个例子也爆了我们的内存, 现学现用, 我们用刚学到的办法来试试能不能惩奸除恶, 先来看看第一种办法.

先来减少进入水缸的事件的数量:

Observable observable1 = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //进行sample采样

        Observable observable2 = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });

来试试运行结果吧:

zip_sample.gif

哈哈, 成功了吧, 再来用第二种办法试试.

这次我们来减缓速度:

Observable observable1 = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //发送事件之后延时2秒
                }
            }
        }).subscribeOn(Schedulers.io());

        Observable observable2 = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });

来看看运行结果吧:

zip_sleep.gif

果然也成功了, 这里只打印出了下游收到的事件, 所以只有一个. 如果你对这个结果看不懂, 请自觉掉头看前面几篇文章.

通过本节的学习, 大家应该对如何处理 Backpressure 已经有了基本的认识了, 大家也可以看到, 我们并没有使用 Flowable, 就轻易的解决了背压的问题, 所以很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法. 后面我们讲到 Flowable 的时候, 大家就会发现它其实没什么神秘的, 它用到的办法和我们本节所讲的基本上是一样的, 只是它稍微做了点封装.

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
cyd
愿你在平凡琐碎的生活里找到意义 深圳

推荐标签 标签

  • FFmpeg

    FFmpeg 是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。

    23 引用 • 32 回帖
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    944 引用 • 1459 回帖 • 18 关注
  • PHP

    PHP(Hypertext Preprocessor)是一种开源脚本语言。语法吸收了 C 语言、 Java 和 Perl 的特点,主要适用于 Web 开发领域,据说是世界上最好的编程语言。

    179 引用 • 407 回帖 • 487 关注
  • Flutter

    Flutter 是谷歌的移动 UI 框架,可以快速在 iOS 和 Android 上构建高质量的原生用户界面。 Flutter 可以与现有的代码一起工作,它正在被越来越多的开发者和组织使用,并且 Flutter 是完全免费、开源的。

    39 引用 • 92 回帖
  • InfluxDB

    InfluxDB 是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及实时分析。

    2 引用 • 72 关注
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    85 引用 • 139 回帖
  • ReactiveX

    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的 API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。

    1 引用 • 2 回帖 • 156 关注
  • Maven

    Maven 是基于项目对象模型(POM)、通过一小段描述信息来管理项目的构建、报告和文档的软件项目管理工具。

    186 引用 • 318 回帖 • 302 关注
  • Gitea

    Gitea 是一个开源社区驱动的轻量级代码托管解决方案,后端采用 Go 编写,采用 MIT 许可证。

    4 引用 • 16 回帖 • 1 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 22 关注
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖
  • Sublime

    Sublime Text 是一款可以用来写代码、写文章的文本编辑器。支持代码高亮、自动完成,还支持通过插件进行扩展。

    10 引用 • 5 回帖
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 3 关注
  • frp

    frp 是一个可用于内网穿透的高性能的反向代理应用,支持 TCP、UDP、 HTTP 和 HTTPS 协议。

    20 引用 • 7 回帖
  • V2Ray
    1 引用 • 15 回帖
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    6 引用 • 63 回帖
  • 锤子科技

    锤子科技(Smartisan)成立于 2012 年 5 月,是一家制造移动互联网终端设备的公司,公司的使命是用完美主义的工匠精神,打造用户体验一流的数码消费类产品(智能手机为主),改善人们的生活质量。

    4 引用 • 31 回帖 • 4 关注
  • PostgreSQL

    PostgreSQL 是一款功能强大的企业级数据库系统,在 BSD 开源许可证下发布。

    22 引用 • 22 回帖
  • 生活

    生活是指人类生存过程中的各项活动的总和,范畴较广,一般指为幸福的意义而存在。生活实际上是对人生的一种诠释。生活包括人类在社会中与自己息息相关的日常活动和心理影射。

    230 引用 • 1454 回帖
  • Pipe

    Pipe 是一款小而美的开源博客平台。Pipe 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    132 引用 • 1114 回帖 • 124 关注
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 398 关注
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    70 引用 • 193 回帖 • 431 关注
  • 域名

    域名(Domain Name),简称域名、网域,是由一串用点分隔的名字组成的 Internet 上某一台计算机或计算机组的名称,用于在数据传输时标识计算机的电子方位(有时也指地理位置)。

    43 引用 • 208 回帖 • 1 关注
  • 负能量

    上帝为你关上了一扇门,然后就去睡觉了....努力不一定能成功,但不努力一定很轻松 (° ー °〃)

    88 引用 • 1235 回帖 • 411 关注
  • Facebook

    Facebook 是一个联系朋友的社交工具。大家可以通过它和朋友、同事、同学以及周围的人保持互动交流,分享无限上传的图片,发布链接和视频,更可以增进对朋友的了解。

    4 引用 • 15 回帖 • 455 关注
  • 创业

    你比 99% 的人都优秀么?

    84 引用 • 1399 回帖 • 1 关注
  • Gzip

    gzip (GNU zip)是 GNU 自由软件的文件压缩程序。我们在 Linux 中经常会用到后缀为 .gz 的文件,它们就是 Gzip 格式的。现今已经成为互联网上使用非常普遍的一种数据压缩格式,或者说一种文件格式。

    9 引用 • 12 回帖 • 135 关注