前言
Rxjava
RxAndroid
本篇博客讲解的 Rxjava 的原理基于版本 2.1.4,RxAndroid 的原理的版本基于 2.0.2 .
基本框架
Rxjava 有四个基本的概念
- Observable (可观察者, 即被观察者)
- Observer (观察者)
subscribe (订阅) 通过该方法, 将 Observable 与 Observer 关联起来
事件 (包括 onNext,onComplete,onError 等事件)
简单来说: Observable 和 Observer 通过 subscribe() 方法实现订阅关系, 从而 Observable 可以在需要的时候发出事件来通知 Observer, 并且回调 Observer 的相应的方法.
用一张简单的图来描述大概如下
该图片来源于 给 Android 开发者的 RxJava 详解
- Observable
- public abstract class Observable<T> implements ObservableSource<T> {
- }
可以看到 Observable 是一个抽象类, 实现了 ObservableSource 接口
Observer
Observer 其实也是一个接口, 里面定义了若干方法, onSubscribe ,onNext,onError,onComplete 方法.
- public interface Observer<T> {
- void onSubscribe(@NonNull Disposable d);
- void onNext(@NonNull T t);
- void onError(@NonNull Throwable e);
- void onComplete();
- }
一个正常的事件序列的调用顺序会是这样的 onSubscribe> onNext> onComplete, 若中途出错了, 那调用顺序可能是这样的 onSubscribe> onNext> onError
onSubscribe 方法, 当我们调用 Observable 的 subscribe 方法的时候, 会先回调 Observer 的 onSubscribe 方法, 此方法的调用顺序先于 onNext,onError ,onComplete 方法.
onError 方法与 onComplete 方法可以说是互斥的, 调用了其中一个方法就不会调用另外一个方法
源码解析
基本使用
在讲解原理之前, 我们先来看一下 Rxjava 的一个基本使用.
- Observable
- .create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- emitter.onNext("a");
- emitter.onNext("b");
- emitter.onNext("c");
- emitter.onComplete();
- }
- })
- .subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e("TAG", "onSubscribe():");
- }
- @Override
- public void onNext(String s) {
- Log.e("TAG", "onNext():" + s);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- Log.e("TAG", "onComplete():");
- }
- });
- E/TAG: onSubscribe():
- E/TAG: onNext(): a
- E/TAG: onNext(): b
- E/TAG: onNext(): c
- E/TAG: onComplete():
首先我们先从上面简单的例子回顾起:
先来看 Observable 的 create 方法
- public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
- ObjectHelper.requireNonNull(source, "source is null");
- return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
- }
在 create 方法中, 其实很简单, 只是对 source 进行判空处理, 并将 source 用 ObservableCreate 包装起来, 并返回回去. 下面让我们一起来看一下 ObservableCreate 是什么东西?
- public final class ObservableCreate<T> extends Observable<T> {
- final ObservableOnSubscribe<T> source;
- public ObservableCreate(ObservableOnSubscribe<T> source) {
- this.source = source;
- }
- @Override
- protected void subscribeActual(Observer<? super T> observer) {
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- observer.onSubscribe(parent);
- try {
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
ObservableCreate 其实也很简单, 它是 Observable 的子类, 持有了上游 source 的引用, 并重写 subscribeActual 方法.
接下来我们来看重点了, 即 Observable 的 subscribe 方法, 在该方法中, 他会将 Observalble 与 observer 关联起来.
- @SchedulerSupport(SchedulerSupport.NONE)
- @Override
- public final void subscribe(Observer<? super T> observer) {
- // 检查 observer 是否为 null, 为 null 抛出异常
- ObjectHelper.requireNonNull(observer, "observer is null");
- try {
- // RxJavaPlugins 插件的, 暂时不管
- observer = RxJavaPlugins.onSubscribe(this, observer);
- // 检查 observer 是否为 null, 为 null 抛出异常
- ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
- subscribeActual(observer);
- } catch (NullPointerException e) { // NOPMD
- throw e;
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- // can't call onError because no way to know if a Disposable has been set or not
- // can't call onSubscribe because the call might have set a Subscription already
- RxJavaPlugins.onError(e);
- NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
- npe.initCause(e);
- throw npe;
- }
- }
subscribe 方法也比较简单, 大概可以分为以下两步:
首先检查 observer 是否为空, 为 null 抛出异常
第二步, 调用 subscribeActual 方法, 而我们知道在 Observable 类中 subscribeActual 是抽象方法, 因此, 我们只需要关注其实现类的 subscribeActual 方法. 从上面的分析, 我们知道, 当我们调用 Observable<T> create(ObservableOnSubscribe<T> source) 方法的时候, 最终会返回 ObservableCreate 实例. 因此, 我们只需要关注 ObservableCreate 的 subscribeActual 方法
- public final class ObservableCreate<T> extends Observable<T> {
- final ObservableOnSubscribe<T> source;
- public ObservableCreate(ObservableOnSubscribe<T> source) {
- this.source = source;
- }
- @Override
- protected void subscribeActual(Observer<? super T> observer) {
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- observer.onSubscribe(parent);
- try {
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
- ----
- }
ObservableCreate 的核心代码主要也只有几行, source 是上游 ObservableOnSubscribe 的引用, 而 CreateEmitter 这个类, 它是 ObservableCreate 的一个静态内部类, 实现了 ObservableEmitter,Disposable 接口 它持有 observer 的引用, 当我们调用 CreateEmitter 的 next 方法的时候, 它会判断当前的 CreateEmitter 有没有被 dispose 掉, 如果没有, 调用他持有的 observer 的 onNext 方法, 同理 onComplete 方法一一样, 只不过执行完 onComplete 方法的时候, 还会执行 dispose 方法, dispose 当前的 CreateEmitter.(dispose 方法这里先记住以下, 下面会讲到)
- static final class CreateEmitter<T>
- extends AtomicReference<Disposable>
- implements ObservableEmitter<T>, Disposable {
- private static final long serialVersionUID = -3434801548987643227L;
- final Observer<? super T> observer;
- CreateEmitter(Observer<? super T> observer) {
- this.observer = observer;
- }
- @Override
- public void onNext(T t) {
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- if (!isDisposed()) {
- observer.onNext(t);
- }
- }
- @Override
- public void onError(Throwable t) {
- if (!tryOnError(t)) {
- RxJavaPlugins.onError(t);
- }
- }
- @Override
- public boolean tryOnError(Throwable t) {
- if (t == null) {
- t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
- }
- if (!isDisposed()) {
- try {
- observer.onError(t);
- } finally {
- dispose();
- }
- return true;
- }
- return false;
- }
- @Override
- public void onComplete() {
- if (!isDisposed()) {
- try {
- observer.onComplete();
- } finally {
- dispose();
- }
- }
- }
- @Override
- public void setDisposable(Disposable d) {
- DisposableHelper.set(this, d);
- }
- @Override
- public void setCancellable(Cancellable c) {
- setDisposable(new CancellableDisposable(c));
- }
- @Override
- public ObservableEmitter<T> serialize() {
- return new SerializedEmitter<T>(this);
- }
- @Override
- public void dispose() {
- DisposableHelper.dispose(this);
- }
- @Override
- public boolean isDisposed() {
- return DisposableHelper.isDisposed(get());
- }
- }
好, 看完上面的代码, 我们回到 ObservableCreate 的 subscribeActual 方法, 我们调用 observer.onSubscribe 方法的时候, 会将 parent 对象作为方法参数暴露出去 (而这个 parent 正是我们的 CreateEmitter, 通过 CreateEmitter 的 dispose 方法可以取消订阅关系). 接着, 当我们调用 source.subscribe(parent) 的时候, 会调用 ObservableOnSubscribe 的 subscribe 方法.
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- observer.onSubscribe(parent);
- try {
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
因此, 在我们上面的例子中, 若不出错, 调用顺序
Observable subcrible> Observable subscribeActual> ObservableCreate subscribeActual> observer.onSubscribe> ObservableOnSubscribe subscribe(emitter 是 CreateEmitter 的实例, 包装了 observer, 调用 emitter 的相应方法 , 会进而调用 observer 的 onNext onComplete 方法, 而不会调用 onError 方法)
若在调用 onNext 方法的过程中出错, 那调用顺序可能是这样的
- Observable subcrible> Observable subscribeActual> ObservableCreate subscribeActual> observer.onSubscribe> ObservableOnSubscribe subscribe(@NonNull ObservableEmitter<T> emitter)
- (emitter 是 CreateEmitter 的实例, 包装了 observer, 调用 emitter 的相应方法 , 会进而调用 observer 的 onNext onError 方法, 而不会调用 onComplete 方法 )
observable 与 Observer 是如何取消订阅关系的
在上面讲解的时候, 其实我们已经有提到 CreateEmitter 的 dispose 方法, 该方法就是用来取消订阅关系的.
假设这样一个场景, 当我们收到的 value 的值大于等于 2 的时候, 这个时候认为是异常的, 解决两者之间的订阅关系
- Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> e) throws Exception {
- e.onNext(1);
- e.onNext(2);
- e.onNext(3);
- e.onNext(4);
- e.onComplete();
- }
- });
- Observer<Integer> observer = new Observer<Integer>() {
- private Disposable disposable;
- @Override
- public void onSubscribe(Disposable d) {
- disposable = d;
- }
- @Override
- public void onNext(Integer value) {
- Log.d("xujun", value.toString());
- if (value>=2) { //>=2 时为异常数据, 解除订阅
- disposable.dispose();
- }
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- };
- observable.subscribe(observer); // 建立订阅关系
- D/xujun: 1 2
总结
Rxjava 的原理其实不难, Observable 和 Observer 通过 subscribe() 方法实现订阅关系, 从而 Observable 可以在需要的时候发出事件来通知 Observer, 并且回调 Observer 的相应的方法.
用一张简单的流程图描述如下:
来源: http://www.jianshu.com/p/448715e333eb