RxJava 的鼎鼎大名相信 Android 开发的同学都非常熟悉了, 其实不仅仅有 RxJava, 还有 RxJs,RxKotlin 等等一系列. 可以说 Rx 并不是一种局限于 Android 的框架, Rx 是一种思想, 我们深入了解了 RxJava, 同样会加深我们对其他 Rx 系列的认知.
使用方法
我们来看一个常见的例子:
- Observable.create(ObservableOnSubscribe<Int> { e ->
- e.onNext(1)
- e.onComplete()
- }).map {
- Log.d("map-thread :", Thread.currentThread().name)
- Log.d("--", "------------------------------")
- "result : $it"
- }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(object : Observer<String> {
- override fun onSubscribe(d: Disposable) {
- Log.d("onSubscribe-thread :", Thread.currentThread().name)
- Log.d("--", "------------------------------")
- }
- override fun onNext(s: String) {
- Log.d("onNext-thread :", Thread.currentThread().name)
- Log.d("onNext-result", s)
- Log.d("--", "------------------------------")
- }
- override fun onError(e: Throwable) {
- Log.d("onError-thread :", Thread.currentThread().name)
- Log.d("onError-message :", e.message)
- Log.d("--", "------------------------------")
- Log.d("--", "------------------------------")
- Log.d("--", "------------------------------")
- }
- override fun onComplete() {
- Log.d("onComplete-thread :", Thread.currentThread().name)
- Log.d("--", "------------------------------")
- Log.d("--", "------------------------------")
- Log.d("--", "------------------------------")
- }
- })
这是一个使用 Kotlin 写的例子, 对 Kotlin 不熟悉的同学无需关注代码细节, 大致能看懂什么意思就行. 首先发送一个数字 1, 然后通过 map 操作符把数字 1 变成 result : 1 , 将之前的操作切换到 IO 线程, 将之后的操作切换到主线程.
整个 RxJava 的使用可以分为三个部分:
创建发送数据的原始 Observable
使用 Rxjava 的操作符对发送事件做相应变换
使用 subscribeOn 和 observeOn 做线程切换
接下来我会对这三个部分做详细的说明.
创建发送数据的原始 Observable
创建发送数据的原始 Observable 采用的是 Observable.create() , 当然使用 Observable.just(1) , Observable.from(list) 等等这些方法也可以创建发送数据的 Observable, 其实这些操作符的本质都是创建了一个新的 Observable, 在订阅的时候发送了一些数据. 我们着重看一下 Observable.create() :
- public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
- // 确保 source 非空
- ObjectHelper.requireNonNull(source, "source is null");
- // 返回一个 ObservableCreate 对象
- return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
- }
第一句很好理解, 确保传入的 ObservableOnSubscribe 对象非空, 第二句本质是直接返回了
new ObservableCreate<T>(source)
执行的结果, 我们以后在看到
RxJavaPlugins.onAssembly(obj)
类似的代码时, 直接可以理解为返回了一个 obj . 所以这个方法实际上做了一件事: 创建了一个 ObservableCreate 类型的对象并返回.
这里我们先了解一下在订阅时, 也就是调用 subscribeWith(observer) 时具体是做了什么:
- public final <E extends Observer<? super T>> E subscribeWith(E observer) {
- subscribe(observer);
- return observer;
- }
- public final void subscribe(Observer<? super T> observer) {
- // 确保 observer 非空
- ObjectHelper.requireNonNull(observer, "observer is null");
- try {
- // 这里一般都是直接返回传入的 observer
- observer = RxJavaPlugins.onSubscribe(this, observer);
- ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
- // 不同类型的 Observable 的具体订阅的方法, 所有的数据发送操作都在这个方法中
- subscribeActual(observer);
- } catch (NullPointerException e) { // NOPMD
- throw e;
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- // can't call onError because no way to know if a Disposable has been set or not
- // can't call onSubscribe because the call might have set a Subscription already
- RxJavaPlugins.onError(e);
- NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
- npe.initCause(e);
- throw npe;
- }
- }
先看第一个方法 subscribeWith(E observer) , 在这个方法内部调用了 subscribe(observer) 这个方法. subscribe(observer) 这个方法的核心是
subscribeActual(observer)
这一句. subscribeActual(observer) 这个方法是 Observable 内的一个抽象方法, Observable 经过 create 操作符后会变成一个 ObservableCreate, 经过 map 操作符会变成一个 ObservableMap 其他的操作符都是一个套路. 经过操作符转化而来的 Observable 都是 Observable 的子类, 在这些子类的内部都会实现 subscribeActual(observer) 这个抽象方法, 并在这个方法内做发送事件的相关操作.
经过上面的分析, 我们知道了 subscribeActual(observer) 这个方法是不同 Observable 的关键, 有了这个前置知识, 我们接着上面的进度看一下 ObservableCreate 这个 Observable 子类中的 subscribeActual(observer) 方法:
- @Override
- protected void subscribeActual(Observer<? super T> observer) {
- // 对 observer 的包装
- CreateEmitter<T> parent = new CreateEmitter<T>(observer);
- observer.onSubscribe(parent);
- try {
- // 这是关键
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
关键是
source.subscribe(parent)
这一句. 这个 source 就是 ObservableOnSubscribe 的实例, 是在 Observable.create(source) 中传入的参数. ObservableOnSubscribe 是一个接口, 在这个接口的 subscribe 方法中我们定义了数据的发送方式. 这个 ObservableOnSubscribe 可以理解为一个 数据发送的剧本 , 这个剧本具体的实现细节写在了 subscribe 方法内. 在最上面的例子中, 我们通过
e.onNext(1);e.onComplete()
发送了一个数字 1, 紧接着通知事件已经发送完毕.
使用 RxJava 的操作符对发送事件做相应变换
由于 RxJava 的事件操作符太多, 我们这里只讲 map 操作符的源码分析:
- public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
- // 确保 mapper 非空
- ObjectHelper.requireNonNull(mapper, "mapper is null");
- // 创建并返回一个 ObservableMap 对象
- return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
- }
是不是很熟悉, map 操作符和 create 操作符是一个套路, 事实上所有的操作符都是一个套路: 创建并返回了一个对原有 Observable 的包装类 , 那我们来看一下这个 ObservableMap :
- @Override
- public void subscribeActual(Observer<? super U> t) {
- source.subscribe(new MapObserver<T, U>(t, function));
- }
- // 对原始 observer 的包装类
- static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
- final Function<? super T, ? extends U> mapper;
- MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
- super(actual);
- this.mapper = mapper;
- }
- @Override
- public void onNext(T t) {
- U v;
- try {
- // 在这里将 T 类型的原始数据 t 变换为类型 U 的新数据 v
- v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
- } catch (Throwable ex) {
- fail(ex);
- return;
- }
- // 调用原始 observer 的 onNext
- actual.onNext(v);
- }
- // 忽略无关代码......
- }
ObservableMap 内部有有两个值得我们注意的点: 一个是我们上面提过的 subscribeActual 方法; 另一个是它的内部类 MapObserver.
subscribeActual 中依然是
source.subscribe(new MapObserver<T, U>(t, function))
这句熟悉的代码, 值得注意的是这句代码并没有传入原始的 observer, 而是传入了 MapObserver 对象. 简而言之, MapObserver 是一个包装类, 它的内部包含我们原始的 observer: actual 和我们变换的具体操作: mapper . 新的 MapObserver 包装类在它的 onNext 方法中做了两步操作:
通过 mapper.apply(t) 实现数据的类型变换, 获取新的变换后的数据
将新类型的数据传入, 通过 actual.onNext(v) 实现原始 observer 接收变换后新数据的功能
其实在不了解 RxJava 的源码之前, 我们常常会惊叹于 RxJava 的神奇: 明明我发送的是数字 1, 我要接收的却是字符串类型的数据, 这中间仅仅是通过了一个 map 操作符, 简直太神奇了! 在我们了解了 map 操作符的源码之后, 就会知道: 我们在订阅的时候 map 操作符将原始的 observer 包装成了一个 MapObserver 对象, 在这个 MapObserver 内部的 onNext 方法中首先会将原始数据 1 通过我们传入的变换规则变换为 result : 1, 在变换之后才会调用我们编写的原始 observer 来处理新的 String 类型的数据.
使用 subscribeOn 和 observeOn 做线程切换
在分析线程切换之前, 我们先明确一些前置知识: 从主线程切换到子线程的操作很简单, 新开一条线程, 在新的线程执行即可. 从子线程切换到主线程, 在 Android 开发中, 所有的第三方框架使用的都是 Handler 机制. 所以不要把这些第三方框架想的特别难, 他们的本质都是 Android 中的基础知识.
我们以
subscribeOn(Schedulers.io())
这个例子来分析, 首先传入的是 Schedulers.io() , 直接说结论吧, 它就是一个 IoScheduler 类型的单例对象. 继续看代码:
- public final Observable<T> subscribeOn(Scheduler scheduler) {
- ObjectHelper.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
- }
我们都很熟悉了, 本质是创建并返回了一个 ObservableSubscribeOn 类型的对象. 我们继续看 ObservableSubscribeOn 的内部:
- public void subscribeActual(final Observer<? super T> s) {
- // 创建一个对原始 observer 的包装对象
- final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
- s.onSubscribe(parent);
- // 这里是关键
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
- }
- static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
- private static final long serialVersionUID = 8094547886072529208L;
- final Observer<? super T> actual;
- final AtomicReference<Disposable> s;
- SubscribeOnObserver(Observer<? super T> actual) {
- this.actual = actual;
- this.s = new AtomicReference<Disposable>();
- }
- @Override
- public void onNext(T t) {
- actual.onNext(t);
- }
- @Override
- public void onError(Throwable t) {
- actual.onError(t);
- }
- @Override
- public void onComplete() {
- actual.onComplete();
- }
- // 忽略无关代码......
- }
- final class SubscribeTask implements Runnable {
- private final SubscribeOnObserver<T> parent;
- SubscribeTask(SubscribeOnObserver<T> parent) {
- this.parent = parent;
- }
- @Override
- public void run() {
- // 这一句我们非常熟悉了, 在这里实现了事件的订阅
- source.subscribe(parent);
- }
- }
这里跟 ObservableMap 中的 subscribeActual 也是一个套路:
创建一个下一级 observer 的包装类 SubscribeOnObserver
通过 source.subscribe(parent) 实现事件的订阅
通过看 SubscribeOnObserver 这个包装类中的 onNext 方法也可以明白: 它的内部直接调用的就是 actual.onNext(t) , 没有像 map 一样做数据的变换, 这很好理解, 因为 subscribeOn(schedule) 本身就只是为了切换线程, 并不做其他多余的操作, 所以这个包装类中的 onNext 才会直接调用下一级 observer 的 onNext.
可能有的小伙伴要有问题了: 你说通过 source.subscribe(parent) 实现事件的订阅, 但 subscribeActual 中并没有你说的代码啊? 其实在这里
scheduler.scheduleDirect(new SubscribeTask(parent))
这里传入了一个 SubscribeTask 对象, 这个对象其实是个 Runnable , 在它的 run() 方法中调用了
source.subscribe(parent)
. 到目前为止, 我们唯一陌生的就是 scheduleDirect 这个方法了, 这个方法定义在 Scheduler 这个线程调度类中, 它实际调用的是:
- @NonNull
- public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
- //createWorker 是一个抽象方法, 不同的线程调度器有不同的实现
- final Worker w = createWorker();
- final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
- DisposeTask task = new DisposeTask(decoratedRun, w);
- // 事件调度处理
- w.schedule(task, delay, unit);
- return task;
- }
scheduleDirect 这个方法做了两件事: 创建一个 Worker 工作类, 调用工作类的 schedule 来进行事件调度. 不同的线程调度器会有不同的处理, 对于 IoSchedule 这个调度器来说, 它最终是执行这个方法:
- public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
- // 忽略无关代码......
- Future<?> f;
- try {
- if (delayTime <= 0) {
- // 通过线程池来开启一个子线程执行
- f = executor.submit((Callable<Object>)sr);
- } else {
- f = executor.schedule((Callable<Object>)sr, delayTime, unit);
- }
- sr.setFuture(f);
- } catch (RejectedExecutionException ex) {
- if (parent != null) {
- parent.remove(sr);
- }
- RxJavaPlugins.onError(ex);
- }
- return sr;
- }
通过上面的分析我们可以知道, subscribeOn(Schedulers.io()) 本质上是通过线程池开启了一个线程, 在这个新的线程中还是通过调用 source.subscribe(observer) 来订阅事件.
subscribeOn(Schedulers.io()) 分析过了, 我们再来看一下 subscribeOn(AndroidSchedulers.mainThread()) 这个线程切换.
AndroidSchedulers.mainThread() 返回的是一个 HandlerScheduler 类型的线程调度器, 它的 scheduleDirect 方法定义如下:
- public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
- if (run == null) throw new NullPointerException("run == null");
- if (unit == null) throw new NullPointerException("unit == null");
- run = RxJavaPlugins.onSchedule(run);
- ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
- // 本质是通过 handler 机制实现线程切换
- handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
- return scheduled;
- }
从它的方法内部可以看到, 它是通过
handler.postDelayed()
来实现切换到主线程的功能的, 这个 handler 是定义在主线程的 handler. 其他类型的线程调度器就不再分析了, 本质都是大同小异.
接下来我们来看一下 observeOn(AndroidSchedulers.mainThread()) , 额, 其实它的套路也是一样的, 创建并返回了一个 ObservableObserveOn , 我们主要关注它的 subscribeActual 方法:
- protected void subscribeActual(Observer<? super T> observer) {
- // 忽略无关代码......
- Scheduler.Worker w = scheduler.createWorker();
- // 订阅包装类 ObserveOnObserver
- source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
- }
- static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
- implements Observer<T>, Runnable {
- // 下一级观察者
- final Observer<? super T> actual;
- // 对应 Scheduler 里的 Worker
- final Scheduler.Worker worker;
- // 上一级 Observable 发送的数据队列
- SimpleQueue<T> queue;
- Disposable s;
- // 如果 onError 了, 保存对应的异常
- Throwable error;
- // 是否完成
- volatile boolean done;
- // 是否取消
- volatile boolean cancelled;
- // 代表同步发送 异步发送
- int sourceMode;
- ....
- @Override
- public void onSubscribe(Disposable s) {
- if (DisposableHelper.validate(this.s, s)) {
- this.s = s;
- // 忽略无关代码......
- queue = new SpscLinkedArrayQueue<T>(bufferSize);
- actual.onSubscribe(this);
- }
- }
- @Override
- public void onNext(T t) {
- // 执行过 error / complete , 直接返回
- if (done) {
- return;
- }
- // 如果数据源类型不是异步的, 默认不是
- if (sourceMode != QueueDisposable.ASYNC) {
- // 将上一级 Observable 发送的数据加入队列中
- queue.offer(t);
- }
- // 这句代码是线程调度的核心, 进入相应 Worker 线程, 在相应线程发送数据队列中的数据
- schedule();
- }
- @Override
- public void onError(Throwable t) {
- if (done) {
- RxJavaPlugins.onError(t);
- return;
- }
- error = t;
- done = true;
- // 这句代码是线程调度的核心, 进入相应 Worker 线程, 在相应线程发送数据队列中的数据
- schedule();
- }
- @Override
- public void onComplete() {
- if (done) {
- return;
- }
- done = true;
- // 开始调度
- schedule();
- }
- void schedule() {
- if (getAndIncrement() == 0) {
- worker.schedule(this);
- }
- }
- @Override
- public void run() {
- // 默认是 false
- if (outputFused) {
- drainFused();
- } else {
- // 取出队列中的数据并发送
- drainNormal();
- }
- }
- void drainNormal() {
- int missed = 1;
- final SimpleQueue<T> q = queue;
- final Observer<? super T> a = actual;
- for (;;) {
- // 如果已经结束或队列为空, 跳出循环
- if (checkTerminated(done, q.isEmpty(), a)) {
- return;
- }
- for (;;) {
- boolean d = done;
- T v;
- try {
- // 从队列里取出一个值
- v = q.poll();
- } catch (Throwable ex) {
- // 异常处理并跳出函数
- Exceptions.throwIfFatal(ex);
- s.dispose();
- q.clear();
- a.onError(ex);
- return;
- }
- boolean empty = v == null;
- if (checkTerminated(d, empty, a)) {
- return;
- }
- if (empty) {
- break;
- }
- // 调用下一级的 observer 的 onNext 方法发送事件
- a.onNext(v);
- }
- missed = addAndGet(-missed);
- if (missed == 0) {
- break;
- }
- }
- }
- boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
- // 如果已经 disposed
- if (cancelled) {
- queue.clear();
- return true;
- }
- // 如果已经结束
- if (d) {
- Throwable e = error;
- // 如果是延迟发送错误
- if (delayError) {
- // 如果空
- if (empty) {
- if (e != null) {
- a.onError(e);
- } else {
- a.onComplete();
- }
- // 停止 worker(线程)
- worker.dispose();
- return true;
- }
- } else {
- // 发送错误
- if (e != null) {
- queue.clear();
- a.onError(e);
- worker.dispose();
- return true;
- } else
- // 发送 complete
- if (empty) {
- a.onComplete();
- worker.dispose();
- return true;
- }
- }
- }
- return false;
- }
- }
可以看到 subscribeOn 与 observeOn 不同的是二者线程切换的地方. subscribeOn 是将 source.subscribe(observer) 整个部分切换到了相应的线程, 而 observeOn 则是在 ObserveOnObserver 这个包装类中的 onNext , onError 方法中做了线程的切换, 具体是在 onNext 等方法中创建了相关的 Worker 工作类, 并调用了这个类的 schedule 方法, 这个跟我们讲 subscribeOn 是一样的.
到此为止, RxJava2 的源码就基本分析完了. 最后根据最初的例子, 用一幅图来展示 RxJava 的流程:
来源: https://segmentfault.com/a/1190000014892869