Rxjava 的诞生背景
首先要从异步编程说起, 最开始的原生的系统中, 如果 UI 系统处理耗时任务, 会引发 ANR, 所以都是放在子线程做耗时任务, 比如网络请求或者 IO 操作, 再来更新 UI 界面, 这需要在主线程来完成, 这样就涉及到了异步编程.
最开始的异步编程主要有:
使用 Java 自身提供的 Future 模型
但这种异步结果获取比较困难, 必须调用 Future.get(), 回去查看异步是否完成, 如果完成, 就返回结果, 否则继续等待. 当然在 JDK8 后, 提供了 completabelFuture, 简化了异步编程
Android 系统提供的异步模型 --AsyncTask. 相比于 Java 提供的方法, 此模型无主线程阻塞风险, 但是最大的问题是有可能陷入层层嵌套的回调.
Rxjava 源码中链式调用
多说也无益, 先看源码.
分析问题时, 我们可以从特殊到普通来分析, 有时候会有意想不到的效果, 所以这次源码由 Single 开始分析, 我们最简单的用法:
先在 App 的 gradle 中
- implementation "io.reactivex.rxjava2:rxjava:2.2.9"
- implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
最简单的实现
- Single.just(1)
- .subscribe(new SingleObserver<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onSuccess(Integer integer) {
- }
- @Override
- public void onError(Throwable e) {
- }
- });
- }
这是最简单的用法, 上游发送一个 1 的事件, 下游接到, 不牵涉线程切换.
创建被观察者
我们先直接进 Just 的源码
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- @NonNull
- public static <T> Single<T> just(final T item) {
- // 判空
- ObjectHelper.requireNonNull(item, "item is null");
- //HOOK 方法
- return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
- }
第一行, 其实看方法名我们也能看出来, 是判空的, 源码如下
- public static <T> T requireNonNull(T object, String message) {
- if (object == null) {
- throw new NullPointerException(message);
- }
- return object;
- }
果然不出所料, 忽略
第二行, 先看外层的 RxJavaPlugins.onAssembly, 进它的源码
- /**
- * Calls the associated hook function.
- * @param <T> the value type
- * @param source the hook's input value
- * @return the value returned by the hook
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @NonNull
- public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
- Function<? super Single, ? extends Single> f = onSingleAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
注意看注释, 说明了这是一个 hook 方法, 可以看到直接 return 的说是传入进来的 source, 所以, 我们可以得出, Single.just(item) 就相当于 new SingleJust<T>(item).
订阅过程
再来看. subscribe(new SingleObserver<Integer>) 的源码
- @SchedulerSupport(SchedulerSupport.NONE)
- @Override
- public final void subscribe(SingleObserver<? super T> observer) {
- // 判空
- ObjectHelper.requireNonNull(observer, "observer is null");
- //HOOK
- observer = RxJavaPlugins.onSubscribe(this, observer);
- // 继续判空
- ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
- try {
- // 执行当前类的 subscribeActual
- subscribeActual(observer);
- } catch (NullPointerException ex) {
- throw ex;
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- NullPointerException npe = new NullPointerException("subscribeActual failed");
- npe.initCause(ex);
- throw npe;
- }
- }
代码里有做注释, 其实真正调用的方法是 subscribeActual(observer); 方法
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
可以发现, 这是一个抽象方法, 那么我们要找到它的实现.
我们回到来看上面的方法其实可以发现, Single.just() 调用的 subscribe, 而 Single.just 我们在上面讲到, 就相当于 new SingleJust(), 所以我们只要看 SingleJust 里的 subscribeActual 方法就可以了.
- public final class SingleJust<T> extends Single<T> {
- final T value;
- public SingleJust(T value) {
- this.value = value;
- }
- @Override
- protected void subscribeActual(SingleObserver<? super T> observer) {
- observer.onSubscribe(Disposables.disposed());
- observer.onSuccess(value);
- }
- }
这个类超级简单, 就是把上游的事件发送到下游 SingleObserver, 比如我们在实例中, Single.just(1) 就相当于 new SingleJust(1), 所以在这儿, value=1, 然后调用 subscribeActual 方法, SingleObserver 是一个接口, 有三个方法, 也是我们回调里的三个方法
- public interface SingleObserver<T> {
- void onSubscribe(@NonNull Disposable d);
- void onSuccess(@NonNull T t);
- void onError(@NonNull Throwable e);
- }
在 subscribeActual 方法中, 先 observer.onSubscribe(Disposables.disposed());, 需要注意的是, 这也是 just 方法独有的, 它直接在 onSubscribe 方法里就 Disposables.disposed 了, 这个方法在后面讲, 这是取消了事件订阅, 因为它只会发一次, 到了这就意味着已经不用订阅了. 然后再调用 observer.onSuccess 方法, 直接把 value 传递了过去.
Map 操作符的源码
再来看增加一个操作符的源码, 就用最常用的 map, 其实操作符一通百通
- Single.just(1)
- .map(new Function<Integer, Integer>() {
- @Override
- public Integer apply(Integer integer) throws Exception {
- return integer+2;
- }
- })
- .subscribe(...);
直接看 map 的源码
- public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
- // 判空
- ObjectHelper.requireNonNull(mapper, "mapper is null");
- //hook, 就相当于 new SingleMap
- return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
- }
可以看到, 这就相当于 new SingleMap(this,mapper); 返回值依然是 Single
我们看 SingleMap 的源码
- public final class SingleMap<T, R> extends Single<R> {
- final SingleSource<? extends T> source;
- final Function<? super T, ? extends R> mapper;
- public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
- // 这就是刚刚传进来的 this, 也就是上游的被观察者
- this.source = source;
- // 这是我们自己在 map 中写的 new function 方法
- this.mapper = mapper;
- }
- // 由上文 subscribe 方法分析可知, 当调用 subscribe 时, 这个回调是会被调用的
- @Override
- protected void subscribeActual(final SingleObserver<? super R> t) {
- // 可以看到, 就是相当于是把上游的被观察者 source, 直接调用了它的 subscribe 方法
- // 我们主要的精力只要集中看 new MapSingleObserver 方法就行
- source.subscribe(new MapSingleObserver<T, R>(t, mapper));
- }
- // 此 observer 观察者中, 把处理后的数据都传递给了下游, 但是, 只提供了事件的流向, 因为事件是在上游产生的
- static final class MapSingleObserver<T, R> implements SingleObserver<T> {
- final SingleObserver<? super R> t;
- final Function<? super T, ? extends R> mapper;
- MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
- this.t = t;
- this.mapper = mapper;
- }
- @Override
- public void onSubscribe(Disposable d) {
- t.onSubscribe(d);
- }
- @Override
- public void onSuccess(T value) {
- R v;
- try {
- // 外面是判空, 相当于就是 mapper.apply(value), 这个方法其实就是我们自己的 map 方法
- v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- onError(e);
- return;
- }
- // 将 map 方法处理后的事件, 传递给下游
- t.onSuccess(v);
- }
- @Override
- public void onError(Throwable e) {
- t.onError(e);
- }
- }
- }
看到这儿我们可以发现, 事件流向是上游的被观察者流向观察者, 在操作符中, 因为操作符自身是继承了被观察者 (在此处为 Single), 而在其自身中, 有一个内部类是观察者 (在此处为实现了 SingleObserver 的 MapSingleObserver), 事件由上游的被观察者, 流向下游的观察者, 而所有的操作符的结构都是一样的, 每个操作符都只需要给上游操作符提供 Observer, 并给下游提供一个 Observable, 内部结构就是, 从上游流向下游内部的 observer 被观察者, 然后此下游的观察者 observable 会调用它自己下游的内部 observer, 这样, 整条链就能运行了.
由此可知, Rxjava 中, 每个操作符内部都实现了一整套 PUSH 模型的接口体系.
由特殊到普通
现在回到最普通的 Rxjava 写法
- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
- emitter.onNext(1);
- emitter.onComplete();
- }
- }).map(new Function<Integer, Integer>() {
- @Override
- public Integer apply(Integer integer) throws Exception {
- return integer+1;
- }
- }).subscribe(new Observer<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(Integer integer) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
先看 create 方法的源码
- public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
- ObjectHelper.requireNonNull(source, "source is null");
- return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
- }
通过上面的分析, 我们一眼可以看出, 就相当于 new ObservableCreate(source)
- public final class ObservableCreate<T> extends Observable<T> {
- final ObservableOnSubscribe<T> source;
- public ObservableCreate(ObservableOnSubscribe<T> source) {
- this.source = source;
- }
- @Override
- protected void subscribeActual(Observer<? super T> observer) {
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);//1
- observer.onSubscribe(parent);//2
- try {
- source.subscribe(parent);//3
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
- static final class CreateEmitter<T>
- extends AtomicReference<Disposable>
- implements ObservableEmitter<T>, Disposable {
- ...
- }
- ...
- }
这个类比较长, 我们先只看我们关心的部分. 只以看到我们喜爱的 subscribeActual 方法, 在订阅时, 会调用到此方法.
再来逐句分析, 在运行 1 语句时, new CreateEmitter, 看到 CreateEmitter 的源码
- // 实现了 ObservableEmitter,ObservableEmitter 是 Emitter 的子类, 用于发射上游数据
- static final class CreateEmitter<T>
- extends AtomicReference<Disposable>
- implements ObservableEmitter<T>, Disposable {
- private static final long serialVersionUID = -3434801548987643227L;
- final Observer<? super T> observer;
- // 下游的 observer
- CreateEmitter(Observer<? super T> observer) {
- this.observer = observer;
- }
- @Override
- public void onNext(T t) {
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- if (!isDisposed()) {
- // 把事件传递给下游 observer, 调用观察者的 onNext 方法
- observer.onNext(t);
- }
- }
- ...
- }
再回到 ObservableCreate 的源码, 它是被观察者 Observable 的子类,
先在 1 时 new 了一个发射器 CreateEmitter 对象, 然后我们把自定义的下游观察者 observer 作为参数传了进去, 这里同样也是包装起来, 这个 CreateEmitter 实现了 ObservableEmitter 和 Disposable 接口
在 2 语句时, 触发我们自定义的 observer 的 onSubscribe(Disposable) 方法, 实际就是调用观察者的 onSubscribe 方法, 告诉观察者已经成功订阅到被观察者了;
再执行在语句 3,source.subscribe(parent); 就和我们分析 Map 一样了, 就是订阅, 把事件从上游传到下游.
小结
Observable(被观察者) 和 Observer(观察者) 建立连接, 也就是订阅之后, 会创建出一个发射器 CreateEmitter, 发射器会把被观察者中产生的事件发送到观察者中, 观察者对发射器中发出的事件做出响应事件. 可以看到, 订阅成功之后, Observabel 才会开始发送事件
来源: http://www.bubuko.com/infodetail-3090337.html