RxAndroid 其实就是对 Android 的 handler, looper 及 Message 的封装,使替变为基于观察者模式的调用。理解其源码并不困难,关键在于要先弄清 Andriod 中 handler,looper 及 Message 的关系,才能理清 RxAndroid2.x 的源码。这三者的关系网上的资料一大堆,我就不重复了。
由于 Android UI 的操作是单线程且非线程安全的,因此不可以把耗时的操作放入主线程即 UI 线程中去,否则会引发 ANR 异常,Android 提供了 AsyncTask,及 handler 机制来进行线程切换,RxAndroid 对 handler,looper,Message 进行了封装提供了另外一种思路
在分析源码之前还是先看下如何使用 RxAndroid2.x,2.x 和 1.x 试用方式基本上没啥变化,直接拿官方示例:
public class MainActivity extends Activity {
private static final String TAG = "RxAndroidSamples";
//一个存放事件源或被观察者的容器
private final CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main_activity);
findViewById(R.id.button_run_scheduler).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
onRunSchedulerExampleButtonClicked();
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
void onRunSchedulerExampleButtonClicked() {
disposables.add(sampleObservable()
// Run on a background thread
//耗时的操作放在io线程中
.subscribeOn(Schedulers.io())
// Be notified on the main thread
//回调操作放在主线程 .observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver() {
@Override
public void onComplete() {
Log.d(TAG, "onComplete()");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError()", e);
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext(" + string + ")");
}
}));
}
//事件源,即被观察者
static Observable sampleObservable() {
return Observable.defer(new Callableextends String>>() {
@Override
public ObservableSourceextends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(5000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}
}
界面布局我就不放了,就一个布局文件,而且官方实例当中都有。
核心代码就两行
// Run on a background thread
//耗时的操作放在io线程中
.subscribeOn(Schedulers.io())
// Be notified on the main thread
//回调操作放在主线程
.observeOn(AndroidSchedulers.mainThread())
怎么样,和原来的代码书写方式比起来是不是简单了很多,只用两行代码!
直接看 RxAndroid2.x 的源码目录结构:
可以看到类不多只有四个类
RxAndroidPlugins,AndroidSchedulers,HandlerScheduler,MainThreadDisposable
这里根据流程来跟踪下源代码,
先看 AndroidSchedulers 中的 mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
可以看到其返回一个 Scheduler,其中 MAIN_THREAD 为主线程的调度器,
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
内部调用了 RxAndroidPlugins 的 onMainThreadScheduler 方法,其代码如下
public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Scheduler, Scheduler> f = onMainThreadHandler;
if (f == null) {
return scheduler;
}
return apply(f, scheduler);
}
可以看到 onMainThreadHandler 是一个 Fuction 类型,并对齐应用了主线程的调度器,返回给 RxJava 调度
static <T, R> R apply(Function<T, R> f, T t) {
try {
return f.apply(t);
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
这样就完成的线程的切换,是不是很简单!
HandlerScheduler 提供了在其它线程中刷新 ui 的方法,具体调用看 AndroidSchedulers 的 from(Looper looper)方法,具体操作和 mainThread 方法类似
MainThreadDisposable 提供了资源释放的方法及对主线程的检验方法,也比较简单
总结:其使用核心代码
//耗时的操作放在io线程中
.subscribeOn(Schedulers.io())
// Be notified on the main thread
//回调操作放在主线程
.observeOn(AndroidSchedulers.mainThread())
仅需两行代码即可完成线程的切换
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于