RxAndroid 其实就是对 Android 的 handler, looper 及 Message 的封装,使替变为基于观察者模式的调用。理解其源码并不困难,关键在于要先弄清 Andriod 中 handler,looper 及 Message 的关系,才能理清 RxAndroid2.x 的源码。这三者的关系网上的资料一大堆,我就不重复了。
由于 Android UI 的操作是单线程且非线程安全的,因此不可以把耗时的操作放入主线程即 UI 线程中去,否则会引发 ANR 异常,Android 提供了 AsyncTask,及 handler 机制来进行线程切换,RxAndroid 对 handler,looper,Message 进行了封装提供了另外一种思路
在分析源码之前还是先看下如何使用 RxAndroid2.x,2.x 和 1.x 试用方式基本上没啥变化,直接拿官方示例:
public class MainActivity extends Activity { private static final String TAG = "RxAndroidSamples"; //一个存放事件源或被观察者的容器 private final CompositeDisposable disposables = new CompositeDisposable(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main_activity); findViewById(R.id.button_run_scheduler).setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { onRunSchedulerExampleButtonClicked(); } }); } @Override protected void onDestroy() { super.onDestroy(); disposables.clear(); } void onRunSchedulerExampleButtonClicked() { disposables.add(sampleObservable() // Run on a background thread //耗时的操作放在io线程中 .subscribeOn(Schedulers.io()) // Be notified on the main thread //回调操作放在主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver() { @Override public void onComplete() { Log.d(TAG, "onComplete()"); } @Override public void onError(Throwable e) { Log.e(TAG, "onError()", e); } @Override public void onNext(String string) { Log.d(TAG, "onNext(" + string + ")"); } })); } //事件源,即被观察者 static Observable sampleObservable() { return Observable.defer(new Callableextends String>>() { @Override public ObservableSourceextends String> call() throws Exception { // Do some long running operation SystemClock.sleep(5000); return Observable.just("one", "two", "three", "four", "five"); } }); } }
界面布局我就不放了,就一个布局文件,而且官方实例当中都有。
核心代码就两行
// Run on a background thread //耗时的操作放在io线程中 .subscribeOn(Schedulers.io()) // Be notified on the main thread //回调操作放在主线程 .observeOn(AndroidSchedulers.mainThread())
怎么样,和原来的代码书写方式比起来是不是简单了很多,只用两行代码!
直接看 RxAndroid2.x 的源码目录结构:
可以看到类不多只有四个类
RxAndroidPlugins,AndroidSchedulers,HandlerScheduler,MainThreadDisposable
这里根据流程来跟踪下源代码,
先看 AndroidSchedulers 中的 mainThread()
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }
可以看到其返回一个 Scheduler,其中 MAIN_THREAD 为主线程的调度器,
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
内部调用了 RxAndroidPlugins 的 onMainThreadScheduler 方法,其代码如下
public static Scheduler onMainThreadScheduler(Scheduler scheduler) { if (scheduler == null) { throw new NullPointerException("scheduler == null"); } Function<Scheduler, Scheduler> f = onMainThreadHandler; if (f == null) { return scheduler; } return apply(f, scheduler); }
可以看到 onMainThreadHandler 是一个 Fuction 类型,并对齐应用了主线程的调度器,返回给 RxJava 调度
static <T, R> R apply(Function<T, R> f, T t) { try { return f.apply(t); } catch (Throwable ex) { throw Exceptions.propagate(ex); } }
这样就完成的线程的切换,是不是很简单!
HandlerScheduler 提供了在其它线程中刷新 ui 的方法,具体调用看 AndroidSchedulers 的 from(Looper looper)方法,具体操作和 mainThread 方法类似
MainThreadDisposable 提供了资源释放的方法及对主线程的检验方法,也比较简单
总结:其使用核心代码
//耗时的操作放在io线程中 .subscribeOn(Schedulers.io()) // Be notified on the main thread //回调操作放在主线程 .observeOn(AndroidSchedulers.mainThread())
仅需两行代码即可完成线程的切换
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于