目录
整体思路
根据对 RxJava 使用的基本认识, 个人觉得解析 RxJava 关键在于抓住以下几个问题:
事件流源头 (observable) 怎么发出数据
响应者 (subscriber) 怎么收到数据
操作符如何运作(operator/transformer)
整个过程的调度(scheduler)
需要说明的一点是, 本文基于 RxJava1.3.0,RxJava 当前最新版本已经升级到了 2.2.4, 后续会单开文章讲述版本之间的变化.
在具体讲述之前, 先来介绍 RxJava 核心的三个类:
Observable
先来看一下源码中的说明: The Observable class that implements the Reactive Pattern.
它其实是一次观察者模式实现的调度者. 所谓一个观察者模式在 RxJava 中指的是一次 subscribe.
一次 subscribe 的实质可以抽象成下述代码, 这个抽象很重要, 后续的一系列变换都是基于这个抽象来做的:
- public class Observable<T> {
- final OnSubscribe<T> onSubscribe;
- protected Observable(OnSubscribe<T> f) {
- this.onSubscribe = f;
- }
- public final Subscription subscribe(Subscriber<? super T> subscriber) {
- this.onSubscribe.call(subscriber);
- }
- }
- OnSubscribe
同样的, 先来看一下 OnSubscribe 的官方说明:
- /**
- * Invoked when Observable.subscribe is called.
- * @param <T> the output value type
- */
- public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
- // cover for generics insanity
- }
- public interface Action1<T> extends Action {
- void call(T t);
- }
- Subscriber
Subscriber 是接口 Observer 的抽象子类,
- public interface Observer<T> {
- void onCompleted();
- void onError(Throwable e);
- void onNext(T t);
- }
RxJava 应用及一次订阅的流程分析
我们先来看一下 RxJava 的一个基本示例, 然后以此为引子, 进行整个流程的追踪和分析.
这个过程很简单, 通过 Observable.just 发射数据, 经过一次 map 转换, 经过 subscribeOn,observeOn 切换线程, 最后通过 subscribe 实现订阅.
- Observable
- .just("Observable.create! User Observable.just!")
- .map(new Func1<String, String>() {
- @Override
- public String call(String s) {
- return "Observable.create! User Observable.map!";
- }
- })
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Action1<String>() {
- @Override
- public void call(String s) {
- Log.i("RxJava", "print message:" + s);
- }
- });
简单梳理一下整个过程的对象转换关系如下:
just
先来看一下 just 的调用过程
- public static <T> Observable<T> just(final T value) {
- return ScalarSynchronousObservable.create(value);
- }
- public static <T> ScalarSynchronousObservable<T> create(T t) {
- return new ScalarSynchronousObservable<T>(t);
- }
- protected ScalarSynchronousObservable(final T t) {
- super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
- this.t = t;
- }
从代码中可以看出, 其核心过程是:
我们创建的是 ScalarSynchronousObservable, 一个 Observable 的子类;
ScalarSynchronousObservable 的构造函数中传入了一个 JustOnSubscribe 类, 这是一个 OnSubscribe 的实现类.
这里我们可以这么理解, Observable 的构造函数传入了一个 OnSubscribe, 这是一个回调, 它有一个回调方法 void call(T t); 这里我们先记住这个 call 回调, 后面再把整个过程串起来.
看一下 JustOnSubscribe 的具体实现:
- static final class JustOnSubscribe<T> implements OnSubscribe<T> {
- final T value;
- JustOnSubscribe(T value) {
- this.value = value;
- }
- @Override
- public void call(Subscriber<? super T> s) {
- s.setProducer(createProducer(s, value));
- }
- }
- static <T> Producer createProducer(Subscriber<? super T> s, T v) {
- // ...
- return new WeakSingleProducer<T>(s, v);
- }
我们再来看 WeakSingleProducer 的源码, 在 request 方法中, 可以看到调用了 onNext() 和 onComplete(), 这样, just 中的数据就被创造并传递出来了.
- static final class WeakSingleProducer<T> implements Producer {
- // ...
- @Override
- public void request(long n) {
- // 省略状态检查代码
- Subscriber<? super T> a = actual;
- if (a.isUnsubscribed()) {
- return;
- }
- T v = value;
- try {
- a.onNext(v);
- } catch (Throwable e) {
- Exceptions.throwOrReport(e, a, v);
- return;
- }
- if (a.isUnsubscribed()) {
- return;
- }
- a.onCompleted();
- }
- }
- map
map 它是一种转换, 将上游输入的数据转换之后, 传递到下游.
- public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
- return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
- }
- public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
- ...
- @Override
- public void call(final Subscriber<? super R> o) {
- MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
- o.add(parent);
- source.unsafeSubscribe(parent);
- }
- }
OnSubscribeMap 类是 OnSubscribe 的子类, unsafeCreate()方法就是通过传入的 OnSubscribe 构造一个 Observable. 这一点和 just 方法本质上是一样的, 通过 OnSubscribe 构造一个 Observable 实例. 所以 Map 的本质就是将一个 Observable 转换成另外一个 Observable, 期间会回调 call 方法. 那么, map 的 call 方法具体做了什么呢?
创建了一个 MapSubscriber;
将 MapSubscriber 加入到 Subscriber 的父链中;
修正订阅关系, source Observable 订阅的是 MapSubscriber, 意思是在 map 之前订阅的是 subscriberA, 此时订阅的就是新的 MapSubscriber, 而 MapSubscriber 是 subscriberA 的 parent, 它们会有一个嵌套关系
MapSubscriber 的源码如下, 其过程还是比较直接的:
上游每新来一个数据, 就用我们给的 mapper 进行数据转换.
再把转换之后的数据发送给下游.
- static final class MapSubscriber<T, R> extends Subscriber<T> {
- ...
- public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
- this.actual = actual;
- this.mapper = mapper;
- }
- @Override
- public void onNext(T t) {
- R result;
- try {
- result = mapper.call(t);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- unsubscribe();
- onError(OnErrorThrowable.addValueAsLastCause(ex, t));
- return;
- }
- actual.onNext(result);
- }
- }
- subscribe
下面我们再来看 subscribe 的过程, 这是 Subscriber 对 OnSubscribe 的订阅过程.
- public final Subscription subscribe(final Action1<? super T> onNext) {
- // 省略参数检查代码
- Action1<Throwable> onError =
- InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
- Action0 onCompleted = Actions.empty();
- return subscribe(new ActionSubscriber<T>(onNext,
- onError, onCompleted)); // 1
- }
- public final Subscription subscribe(Subscriber<? super T> subscriber) {
- return Observable.subscribe(subscriber, this);
- }
- static <T> Subscription subscribe(Subscriber<? super T> subscriber,
- Observable<T> observable) {
- // 省略参数检查代码
- subscriber.onStart(); // 2
- if (!(subscriber instanceof SafeSubscriber)) {
- subscriber = new SafeSubscriber<T>(subscriber); // 3
- }
- try {
- RxJavaHooks.onObservableStart(observable,
- observable.onSubscribe).call(subscriber); // 4
- return RxJavaHooks.onObservableReturn(subscriber); // 5
- } catch (Throwable e) {
- // 省略错误处理代码
- }
- }
我们首先对传入的 Action 进行包装, 包装为 ActionSubscriber, 一个 Subscriber 的实现类.
调用 subscriber.onStart() 通知 subscriber 它已经和 observable 连接起来了. 这里我们就知道, onStart() 就是在我们调用 subscribe() 的线程执行的.
如果传入的 subscriber 不是 SafeSubscriber, 那就把它包装为一个 SafeSubscriber.
我们跳过 hook, 认为它什么也没做, 那这里我们调用的其实是 observable.onSubscribe.call(subscriber), 这里我们就看到了前面提到的 onSubscribe 的使用代码, 在我们调用 subscribe() 的线程执行这个回调.
跳过 hook, 那么这里就是直接返回了 subscriber, Subscriber 继承了 Subscription, 用于取消订阅.
我们应该还记得 OnSubscribeMap 中的 call 方法吧, 这里的 observable.onSubscribe.call(subscriber)调用的就是 OnSubscribeMap.call()方法. 在 OnSubscribeMap.call()之中, 有一段代码: source.unsafeSubscribe(parent); 它会继续回溯去调用上一个 observable.onSubscribe.call()的 call 方法, 而这个 call 方法就是 JustOnSubscribe 中的 call 方法
- public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
- try {
- // new Subscriber so onStart it
- subscriber.onStart();
- // allow the hook to intercept and/or decorate
- RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
- return RxJavaHooks.onObservableReturn(subscriber);
- } catch (Throwable e) {
- // 省略错误处理代码
- }
- return Subscriptions.unsubscribed();
- }
- }
整个过程如下:
RxJava 顺序流程. PNG
这里我们可以看到 RxJava 中存在这样一种嵌套关系:
RxJava 回溯执行的过程. PNG
线程调度
前面的过程都是通过函数调用来完成的, 都在 subscribe 所在的线程执行, RxJava 进行异步非常简单, 只需要使用 subscribeOn 和 observeOn 这两个操作符即可. 既然它俩都是操作符, 那流程上就是和 map 差不多的, 这里我们主要关注线程调度的实现原理. subscribeOn 和 observeOn 操作符的调用者是 Observable<T>, 方法参数是 Scheduler, 它们的区别是 subscribeOn 决定的是上游 Observable 的执行线程, observeOn 决定的是下游的 Subscriber 回调执行的线程, 下面我们来看具体是怎么实现的.
subscribeOn
追踪 subscribeOn 的调用过程, 其调用过程通过 OperatorSubscribeOn 进行了一次转换. 过程如下:
获取 Scheduler 中的 Worker 对象 inner;
将 Subscriber 包装成 SubscribeOnSubscriber, 这个是 parentSubcriber;
inner.schedule(parent) 执行具体过程
SubscribeOnSubscriber 中的 setProducer 方法中, 做了进一步的线程调度
如果当前是在同一个线程中, 直接 request;
如果不在同一个线程中, 发生一次线程调度
那么, 这两次调度有什么区别呢? 简单的说:
inner.schedule(parent)调度影响的是 Subscriber 的回调, 也就是下游的监听;
setProducer 调度影响的是上游数据的 request;
所以 subscribeOn 影响的是上下游的执行线程, 下游如果要切换线程, 需要通过 observeOn 进行切换
- public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
- final Scheduler scheduler;
- final Observable<T> source;
- final boolean requestOn;
- public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
- this.scheduler = scheduler;
- this.source = source;
- this.requestOn = requestOn;
- }
- @Override
- public void call(final Subscriber<? super T> subscriber) {
- final Worker inner = scheduler.createWorker();
- SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
- subscriber.add(parent);
- subscriber.add(inner);
- inner.schedule(parent);
- }
- ...
- }
- SubscribeOnSubscriber
- static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
- ...
- @Override
- public void setProducer(final Producer p) {
- actual.setProducer(new Producer() {
- @Override
- public void request(final long n) {
- if (t == Thread.currentThread() || !requestOn) {
- p.request(n);
- } else {
- worker.schedule(new Action0() {
- @Override
- public void call() {
- p.request(n);
- }
- });
- }
- }
- });
- }
- }
- observeOn
同样的, 我们追踪 observeOn. 过程如下:
创建 OperatorObserveOn, 继承自 Operator;
通过 lift 操作符进行切换
- public final Observable<T> observeOn(Scheduler scheduler) {
- return observeOn(scheduler, RxRingBuffer.SIZE);
- }
- public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
- if (this instanceof ScalarSynchronousObservable) {
- return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
- }
- return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
- }
- OnSubscribeLift
它的逻辑是先对下游 subscriber 用操作符进行处理, 处理会返回一个新的 subscriber, 然后通知处理后的 subscriber, 它将要和 observable 连接起来了, 最后把它和上游连接起来.
- public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
- @Override
- public void call(Subscriber<? super R> o) {
- ...
- Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
- st.onStart();
- parent.call(st);
- ...
- }
- }
- OperatorObserveOn
作为操作符的逻辑, 也比较简单, 如果 scheduler 是 ImmediateScheduler/TrampolineScheduler, 就什么也不做, 否则就把 subscriber 包装为 ObserveOnSubscriber, 看来脏活累活都是 ObserveOnSubscriber 干的了.
- public final class OperatorObserveOn<T> implements Operator<T, T> {
- // ...
- @Override
- public Subscriber<? super T> call(Subscriber<? super T> child) {
- if (scheduler instanceof ImmediateScheduler) {
- // avoid overhead, execute directly
- return child;
- } else if (scheduler instanceof TrampolineScheduler) {
- // avoid overhead, execute directly
- return child;
- } else {
- ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(
- scheduler, child, delayError, bufferSize);
- parent.init();
- return parent;
- }
- }
- // ...
- }
- ObserveOnSubscriber
继续看 ObserveOnSubscriber, 它是 observeOn 只有生成的新的 subscriber, 下面的源码是简化之后的实现. 我们可以看到它调度了每个单独的 subscriber.onXXX() 方法. 所以这就是 observeOn 调度只影响 subscriber 的原因了!!!!
- Observable.create(subscriber -> {
- Worker worker = scheduler.createWorker();
- subscriber.add(worker);
- source.unsafeSubscribe(new Subscriber<T>(subscriber) {
- @Override
- public void onNext(T t) {
- worker.schedule(() -> subscriber.onNext(t));
- }
- @Override
- public void onError(Throwable e) {
- worker.schedule(() -> subscriber.onError(e));
- }
- @Override
- public void onCompleted() {
- worker.schedule(() -> subscriber.onCompleted());
- }
- });
- });
RxJava 应用举例
使用 RxJava 实现从 DB load 数据
通过 Observable 提供的系列 create 方法创建, create 系列方法有:
- Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure)
- Observable<T> unsafeCreate(OnSubscribe<T> f)
- Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe)
- Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe)
这里采用第二个方法创建:
- Observable.unsafeCreate(new rx.Observable.OnSubscribe<Data>() {
- @Override
- public void call(Subscriber<? super Data> subscriber) {
- Data data = null;
- // sql 操作, loadFromDB
- subscriber.onNext(data);
- subscriber.onCompleted();
- }
- })
- .subscribeOn(Schedulers.io())
- .subscribe(new Action1<Data>() {
- @Override
- public void call(Data data) {
- // handle data
- }
- });
使用 RxJava 实现分页数据加载
开发中, 我们会遇到这样的场景, 某个接口采用分页拉取方式, 初始化时我们可能需要循环去拉, 一次性把数据全部拉取到, 假定你不能通过 limit 设置成无限大的方法拉取一次. 这种场景, 一般处理可能是循环迭代拉, 如果采用 RxJava 则会非常方便.
- protected void fetchPatients() {
- Observable observable = Observable.range(0, Integer.MAX_VALUE)
- .concatMap(new Func1<Integer, Observable<List<Data>>>() {
- @Override
- public Observable<List<Data>> call(Integer page) {
- return getPageObservable(page);
- }
- })
- .takeWhile(new Func1<List<Data>, Boolean>() {
- @Override
- public Boolean call(List<Data> data) {
- return data.size() <FETCH_LIMIT;
- }
- }).reduce(new ArrayList<Data>(), new Func2<ArrayList<Data>, List<Data>, ArrayList<Data>>() {
- @Override
- public ArrayList<Data> call(ArrayList<Data> datas, List<Data> datas2) {
- datas.addAll(datas2);
- return datas;
- }
- })
- .map(new Func1<List<Data>, List<Data>>() {
- @Override
- public List<Data> call(List<Data> datas) {
- // do some last handle
- return datas;
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(Schedulers.trampoline())
- .subscribe(new Action1() {
- @Override
- public void call(List<Data> datas) {
- }
- }, new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- }
- });
- }
- protected Observable getPageObservable(int page) {
- Observable observable = apiService.getPager(page, FETCH_LIMIT)
- .map(new Func1<List<Data>, List<Data>>() {
- @Override
- public List<Data> call(List<Data> datas) {
- // do some pre handle
- return datas;
- }
- });
- return observable;
- }
总结
本文从最简单的用例出发, 追踪了 RxJava 的完整过程, 也响应了文章开头所提的四个步骤:
事件流源头 (observable) 怎么发出数据
响应者 (subscriber) 怎么收到数据
操作符如何运作(operator/transformer)
整个过程的调度(scheduler)
关于 RxJava, 还有两个核心的问题:
RxJava 调度器 Scheduler
RxJava 中的背压概念
这两个问题我会在后续的文章中继续论述
来源: https://juejin.im/entry/5c06587bf265da617006e5b6