上一篇文章 Android 进阶: 四, RxJava2 https://blog.51cto.com/14295695/2384637 里我们讲到 Rxjava2 从创建一个事件到事件被观察的过程原理, 这篇文章我们讲 Rxjava2 中链式调用的原理. 本文不讲用法, 仍然需要读者熟悉 Rxjava 基本的用法.
一. Rxjava2 的基本用法
Rxjava 是解决异步问题的, 它的链式调用让代码看起来非常流畅优雅. 现在我们带上线程切换以及链式调用来看看. 下面代码是示例:
- Observable
- .create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- e.onNext("a");
- }
- })
- .subscribeOn(Schedulers.io())
- .unsubscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .map(new Function<String, Integer>() {
- @Override
- public Integer apply(String s) throws Exception {
- return 1;
- }
- })
- .subscribe(new Observer<Object>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(Object o) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
我们创建一个事件 (观察者), 想输出一个字符串 "a". 这个事件发生在 IO 线程, 结束也在 IO 线程, 事件的状态回调发生在主线程. 示例的用法大家应该都能懂, 我们主要讨论这个链式的原理流程. 为什么这么说呢? 因为这个链式跟一般的链式不太一样
二. create 方法
这个方法我们之前看过, 返回一个 ObservableCreate 对象, ObservableCreate 继承自 Observable, 里面的 source 存着我们创建的 ObservableOnSubscribe 匿名对象.
三. subscribeOn 方法
这是 Obserbvable 的方法, 先看源码:
- public final Observable<T> subscribeOn(Scheduler scheduler) {
- ObjectHelper.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
- }
- public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
- Function<? super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
代码结构跟 create 的差不多, 在钩子函数里直接返回我们创建的对象 ObservableSubscribeOn<T>(this, scheduler), 并传入当前的 Observable 也就是 ObservableCreate 对象. 所以我们看一下这个类的代码:
- public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
- final Scheduler scheduler;
- public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
- super(source);
- this.scheduler = scheduler;
- }
- }
这个类继承自 AbstractObservableWithUpstream 类, 构造函数的参数是 ObservableSource, 所以这里我们需要介绍两个类:
ObservableSource
ObservableSource 是一个接口, 所有的 Observable 都实现了这个接口, 它里面只有:
void subscribe(@NonNull Observer<? super T> observer);
这一个方法. 很明显这个方法是为了让 Observer 订阅 Observable 的, 或者说为了 Observable 把事件状态传递给 Observer 的.
AbstractObservableWithUpstream
这个类继承了 Observbable
- abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
- protected final ObservableSource<T> source;
- AbstractObservableWithUpstream(ObservableSource<T> source) {
- this.source = source;
- }
- }
从源码可以看出这个类有变量 source, 它在构造函数里传入值, 存储 ObservableSource 对象.
所以当我们调用 Observable 的 subscribeOn 方法的时候会创建一个 ObservableSubscribeOn 对象, 并用变量 source 存储当前的 Observable 对象, 然后返回 ObservableSubscribeOn 对象.
四. unsubscribeOn 方法
- public final Observable<T> unsubscribeOn(Scheduler scheduler) {
- ObjectHelper.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableUnsubscribeOn<T>(this, scheduler));
- }
- public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
- Function<? super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
这个方法跟上面的方法是一个模子刻的. 所以我们主要看 ObservableUnsubscribeOn 这个类就好.
- public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
- final Scheduler scheduler;
- public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
- super(source);
- this.scheduler = scheduler;
- }
- }
这个类跟刚才的 ObservableSubscribeOn 也几乎一模一样, 继承自 AbstractObservableWithUpstream 类, 使用 source 存了当前 Observable 对象. 而此时的 Observbvable 对象是上一个方法创建的对象, 也就是 ObservableSubscribeOn 对象.
五. observeOn 方法和 map 方法
由于这些方法的内容基本一样我就省略代码的解释.
observeOn 方法是创建了 ObservableObserveOn 对象, 并保存上一个方法创建的 Observable.map 方法是创建 ObservableMap 对象, 并保存上一个方法创建的 Observable
所以总结一下可知: 链式调用这些方法的时候, 都会创建一个相关的对象, 然后用变量 source 存储上一个方法创建的 Observable 子类对象.
六. subscribe 方法
上次文章讲到, 这个方法内部会调用一个抽象方法, subscribeActual 方法, 作为真实的订阅. 而这个方法的逻辑需要看子类如何实现.
而第一次调用该这个 subscribe 方法的对象是 ObservableMap 对象. 所以我们看看它内部如何实现的.
ObservableMap 的 subscribeActual 方法实现:
- public void subscribeActual(Observer<? super U> t) {
- source.subscribe(new MapObserver<T, U>(t, function));
- }
内部调用了 source 的 subscribe 方法. 此时 ObservableMap 对象里存的 source 是上一个方法创建的 observable, 也就是 ObservableObserveOn 对象. 所以我们要看看 ObservableObserveOn 是如何实现 subscribeActual 方法的:
- protected void subscribeActual(Observer<? super T> observer) {
- if (scheduler instanceof TrampolineScheduler) {
- source.subscribe(observer);
- } else {
- Scheduler.Worker w = scheduler.createWorker();
- source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
- }
- }
同理他最终也是调用了上一个 Observable 的 subscribe.
于是我们知道当我们调用 subscribe 方法的时候, 会递归式的调用 source 存储的上一个方法创建的 Observable 的 subscribeActual 方法, 一直到 ObsservableCreate 的 subscribeActual 的方法, 把事件状态传递给观察者. 这个上一篇文章已经讲过.
七. 总结
我们常见的普通的链式调用一般都会返回当前同一个对象. 和普通的链式调用不同当我们调用 Rxjava2 的链式调用时, 他们会返回自己对应的 Observable 子类对象, 每个对象都不一样, 然后在 subscribeActual 方法中递归式的调用每个对象的 subscribeActual 方法, 完成一个链式的调用.
写在最后
很多人在刚接触这个行业的时候或者是在遇到瓶颈期的时候, 总会遇到一些问题, 比如学了一段时间感觉没有方向感, 不知道该从那里入手去学习, 对此我整理了一些资料, 需要的可以免费分享给大家, 后面也会整理比较新比较火的技术分享的 flutter - 性能优化 - 移动架构 - 资深 UI 工程师 -NDK
来源: http://www.bubuko.com/infodetail-3035530.html