前言: 到现在这个阶段, 网上关于 RxJava2 源码分析的文章已经满天飞了, 我写这篇文章的目的并不是说我会分析的比他们好, 比他们透彻, 这篇文章的目的只是单纯的记录自己分析 RxJava2 源码的成功及收获.
概述
对于一个编程人的技术成长, 一般会经历三个阶段, 首先是学会使用开源库, 然后是知道开源库的原理, 最后就是自己写一个开源库. 虽然在日常的开发中使用 RxJava2 已经达到了得心应手的地步, 但是不了解具体的原理, 总感觉有点虚. 于是就想静下心来, 好好的分析一下 RxJava 源码, 达到不仅知其然更知其所以然的地步.
下图是分析 RxJava 基本流程后, 画的 UML 图, 对于已经分析过源码的大神, 可以看下图画的是否正确, 对于没有分析过源码的人, 可以看下, 先有个映像, 然后再跟着文章的内容, 一点点的理解.(点击图片查看大图)
源码分析
先看 RxJava2 基础用法的代码
- private void basicUseRxJava() {
- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
- emitter.onNext(1);
- emitter.onNext(2);
- emitter.onNext(3);
- }
- }).subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(String s) {
- Log.e("wizardev", "onNext:"+s);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
- }
以上代码, 只是 RxJava2 的基本使用, 并没有涉及任何的操作符代码, 下面我们就按方法顺序开始分析源码.
create 方法分析
看下 create() 方法的代码
- public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
- //1, 判空
- ObjectHelper.requireNonNull(source, "source is null");
- //2,
- return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
- }
从以上代码可以看出, create 方法的返回值类型是 Observable, 参数是 ObservableOnSubscribe<T>, 可以先看下这个 ObservableOnSubscribe 类, 源码如下
- public interface ObservableOnSubscribe<T> {
- /**
- * Called for each Observer that subscribes.
- * @param emitter the safe emitter instance, never null
- * @throws Exception on error
- */
- void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
- }
可以发现 ObservableOnSubscribe 类是一个接口, 里面有一个 subscribe 方法. 现在继续看 create 方法中的代码, 在 "1" 处代码是判断传入的参数是否为空. 这里主要看下 "2" 处, 这句 RxJavaPlugins.onAssembly 其实是一个 Hook 方法,**"2" 处代码实质就是 return new ObservableCreate<T>(source);,** 不信的话, 可以看下 onAssembly 方法, 如下
- public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
- Function<? super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
经调试, onObservableAssembly 为 null, 所以上面的代码就直接返回了 new ObservableCreate<T>(source).
现在看下 ObservableCreate 类, 如下
- public final class ObservableCreate<T> extends Observable<T> {
- //1, 全局变量
- final ObservableOnSubscribe<T> source;
- //2, 构造方法中将 source 赋值
- public ObservableCreate(ObservableOnSubscribe<T> source) {
- this.source = source;
- }
- //3, 这个方法是在调用 subscribe 方法才调用的
- @Override
- protected void subscribeActual(Observer<? super T> observer) {
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- observer.onSubscribe(parent);
- try {
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
- //... 省略部分代码
- }
从上面的代码可以知道, ObservableCreate 类继承自 Observable, 在实例化的时候将 create 方法中的 ObservableOnSubscribe<T> source 参数注入了进来, 作为成员变量 source.
结论
通过分析 Observable 类的 create 方法, 可以有以下结论:
create 方法的返回值类型是 Observable;
create 方法的参数的类型是接口;
create 方法实际返回的是 ObservableCreate 类, 而 ObservableCreate 类是 Observable 的子类;
在实例化 ObservableCreate 类的时候将 create 的方法的参数注入到了 ObservableCreate 类中, 作为它的成员变量 source.
这里重点看下第 4 个结论, 在这里 create 方法的参数实际就是下面的代码
- new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
- emitter.onNext(1);
- emitter.onNext(2);
- emitter.onNext(3);
- }
- }
subscribe 方法分析
分析完了 create 方法, 接着来分析 subscribe 方法, 其方法代码如下
- public final void subscribe(Observer<? super T> observer) {
- //1, 判空
- ObjectHelper.requireNonNull(observer, "observer is null");
- try {
- //2,Hook 方法, 实质就是 observer
- observer = RxJavaPlugins.onSubscribe(this, observer);
- // 判空
- ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
- //4, 重点,
- subscribeActual(observer);
- } catch (NullPointerException e) { // NOPMD
- throw e;
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- // can't call onError because no way to know if a Disposable has been set or not
- // can't call onSubscribe because the call might have set a Subscription already
- RxJavaPlugins.onError(e);
- NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
- npe.initCause(e);
- throw npe;
- }
- }
这里重点看下 "4" 处, 这里调用了 Obeservable 的 subscribeActual 方法, 可以看下 Obeservable 类中的这个方法, 如下
protected abstract void subscribeActual(Observer<? super T> observer);
这个方法是抽象的, 实际调用的是它子类中的方法, 通过上文的分析, 我们知道 ObservableCreate 就 Obeservable 类的子类, 所以, 这里调用的实际就是 ObservableCreate 类中的 subscribeActual 方法. 现在, 我们再看下这个方法中的代码, 如下
- @Override
- protected void subscribeActual(Observer<? super T> observer) {
- //1, 实例化 CreateEmitter
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- //2, 回调方法
- observer.onSubscribe(parent);
- try {
- //3, 回调方法
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
我们一步步的分析这个方法中的代码, 先看 "1" 处的代码, 这里实例化了 CreateEmitter 这个类, 在实例化的同时将 observer 传了进去. 看下 CreateEmitter 这个类的代码, 如下
- static final class CreateEmitter<T>
- extends AtomicReference<Disposable>
- implements ObservableEmitter<T>, Disposable {
- //... 省略部分代码
- final Observer<? super T> observer;
- CreateEmitter(Observer<? super T> observer) {
- this.observer = observer;
- }
- @Override
- public void onNext(T t) {
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- if (!isDisposed()) {
- observer.onNext(t);
- }
- }
- @Override
- public void onError(Throwable t) {
- if (!tryOnError(t)) {
- RxJavaPlugins.onError(t);
- }
- }
- @Override
- public boolean tryOnError(Throwable t) {
- if (t == null) {
- t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
- }
- if (!isDisposed()) {
- try {
- observer.onError(t);
- } finally {
- dispose();
- }
- return true;
- }
- return false;
- }
- @Override
- public void onComplete() {
- if (!isDisposed()) {
- try {
- observer.onComplete();
- } finally {
- dispose();
- }
- }
- }
- //... 省略部分代码
- }
通过上面的代码, 可以发现 CreateEmitter 这个类实现了 ObservableEmitter 这个接口, 而这个接口是 ObservableOnSubscribe 接口中 subscribe 方法的参数, 是不是发现什么了? 现在继续往下看, 看下 "2" 处的代码, 这里回调了 Observer 的 onSubscribe 方法, 分析到这里, 可以得出下面的结论
onSubscribe() 回调所在的线程是 ObservableCreate 执行 subscribe() 所在的线程, 和 subscribeOn()/observeOn() 无关!
重点来了, 这里看下 "3" 处的代码, 还记得 source 是谁吗?** 它就是执行 Observable.create 方法时, 我们注入给 ObservableCreate 类的成员变量, 是 ObservableOnSubscribe 接口的实例.** 这里调用的 subscribe 方法, 实际就是下面代码的 subscribe 方法,
- public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
- emitter.onNext(1);
- emitter.onNext(2);
- emitter.onNext(3);
- }
这段代码中的 subscribe 方法的参数实质就是 CreateEmitter, 调用的 onNext 方法就是 CreateEmitter 类中的 onNext 方法. 继续看下 CreateEmitter 类中的 onNext 方法, 代码如下
- @Override
- public void onNext(T t) {
- //1, 判断传入的参数是否为 null
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- if (!isDisposed()) {
- //2, 调用 Observer 中的 onNext 方法
- observer.onNext(t);
- }
- }
分析到这里, 就可以得出以下结论了
subscribe 方法中发射器所调用的 onNext 方法, 如果代码没有出错的话, 最终调用的就是 Observer 中的 onNext 方法.
分析 CreateEmitter 中的其他方法, 还可以知道为什么 Observer 中的 onError 和 onComplete 方法只有一个会回调的原因了, 原因就是无论调用的是哪一个方法都会调用 dispose() 方法取消订阅.
结论
对 Observable.subscribe 方法的分析可以得出以下结论
subscribe 方法最终调用了 ObservableCreate 类中的 subscribeActual 方法.
subscribeActual 方法中, 实例化了发射器, 并开始发射数据.
subscribe 方法中发射器所调用的 onNext 方法, 如果代码没有出错的话, 最终调用的就是 Observer 接口中的 onNext 方法.
总结
通过对 RxJava 基本流程的源码分析, 是不是对 RxJava 的原理有了更清晰的认识呢? 分析完之后, 我们再看下这张图, 是不是感觉现在看起来就明白多了呢?
结束语
想要了解一些开源库的原理, 我们必须要阅读其源码, 只有从源码中才能得到想要的答案, 才能对库的原理有更清晰的认识.
再说下, 阅读开源库的注意事项, 阅读源码时, 我们最好带着问题来阅读, 阅读前先有个目标, 比如我这次阅读要搞懂什么问题, 然后再开始阅读, 不然就会很容易在茫茫代码中迷失. 还有就是不要想着每句代码都搞懂, 搞懂与自己想要获取的答案有关的代码即可.
来源: https://juejin.im/post/5c3185f5e51d45518d46a448