Rxjava 2.x 源码系列 - 基础框架分析 https://blog.csdn.net/gdutxiaoxu/article/details/80501816
Rxjava 2.x 源码系列 - 线程切换 (上) https://blog.csdn.net/gdutxiaoxu/article/details/80577389
Rxjava 2.x 源码系列 - 线程切换 (下) https://blog.csdn.net/gdutxiaoxu/article/details/80599799
前言
在上一篇博客 Rxjava 源码系列 - 基础框架分析 https://blog.csdn.net/gdutxiaoxu/article/details/80501816 , 我们分析了 Rxjava 的基础框架.
Observable 和 Observer 通过 subscribe() 方法实现订阅关系, 从而 Observable 可以在需要的时候发出事件来通知 Observer, 并且回调 Observer 的相应的方法.
用一张简单的流程图描述如下:
image
Observable#subscribeOn(Scheduler)
在 Android 中, 我们知道默认都是执行在主线程的, 那么 Rxjava 是如何实现线程切换的.
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- emitter.onNext("1");
- emitter.onNext("2");
- emitter.onNext("3");
- emitter.onComplete();
- }
- })
- .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e("TAG", "onSubscribe():");
- }
- @Override
- public void onNext(String s) {
- Log.e("TAG", "onNext():" + s);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- Log.e("TAG", "onComplete():");
- }
- });
我们先来看一下 subscribeOn 方法, 可以看到
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.CUSTOM)
- public final Observable<T> subscribeOn(Scheduler scheduler) {
- // scheduler 判空
- ObjectHelper.requireNonNull(scheduler, "scheduler is null");
- // 用 ObservableSubscribeOn 将 scheduler 包装 起来
- return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
- }
而我们从上一篇博客中知道, 当我们调用 observable.subscibe(observable) 的时候, 最终会调用到具体的 observable 的实例的 subscribActual 方法. 而这里具体的 observable 的实例为 ObservableSubscribeOn.
接下来, 我们来看一下 ObservableSubscribeOn 这个类, 可以看到继承 AbstractObservableWithUpstream , 而 AbstractObservableWithUpstream 继承 Observable, 实现 HasUpstreamObservableSource 这个接口.
- public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
- final Scheduler scheduler;
- public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
- super(source);
- this.scheduler = scheduler;
- }
- @Override
- public void subscribeActual(final Observer<? super T> s) {
- final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
- s.onSubscribe(parent);
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
- }
- ---
- }
- abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
- /** The source consumable Observable. */
- protected final ObservableSource<T> source;
- /**
- * Constructs the ObservableSource with the given consumable.
- * @param source the consumable Observable
- */
- AbstractObservableWithUpstream(ObservableSource<T> source) {
- this.source = source;
- }
- @Override
- public final ObservableSource<T> source() {
- return source;
- }
- }
- public interface HasUpstreamObservableSource<T> {
- /**
- * Returns the upstream source of this Observable.
- * <p>Allows discovering the chain of observables.
- * @return the source ObservableSource
- */
- ObservableSource<T> source();
- }
observableSubscribeOn 的 subscribeActual 方法, 跟 ObservableCreate 的 subscribeActual 的套路差不多, 它也是 Observable 的一个子类. 只不过比 ObservableCreate 多实现了一个接口 HasUpstreamObservableSource, 这个接口很有意思, 他的 source() 方法返回类型是 ObservableSource(还记得这个类的角色吗?). 也就是说 ObservableSubscribeOn 这个 Observable 是一个拥有上游的 Observable . 他有一个非常关键的属性 source, 这个 source 就代表了他的上游.
接下来我们一起来看一下 ObservableSubscribeOn 的具体实现
- public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
- final Scheduler scheduler;
- public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
- super(source);
- this.scheduler = scheduler;
- }
- @Override
- public void subscribeActual(final Observer<? super T> s) {
- final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
- s.onSubscribe(parent);
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
- }
- }
首先先来看他的构造函数 , 有两个参数 source ,scheduler.
source 代表上游的引用, 是 Observable 的一个实例
scheduler 可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例
这里我们先大概了解一下 Scheduler 是个什么东东, Scheduler 里面封装了 Worker 和 DisposeTask, 下面会详细讲到.
- Schedulers.newThread()
- @NonNull
- public static Scheduler newThread() {
- return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
- }
- NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
- static final class NewThreadTask implements Callable<Scheduler> {
- @Override
- public Scheduler call() throws Exception {
- return NewThreadHolder.DEFAULT;
- }
- }
- static final class NewThreadHolder {
- static final Scheduler DEFAULT = new NewThreadScheduler();
- }
- public static Scheduler io() {
- return RxJavaPlugins.onIoScheduler(IO);
- }
- IO = RxJavaPlugins.initIoScheduler(new IOTask());
- static final class IOTask implements Callable<Scheduler> {
- @Override
- public Scheduler call() throws Exception {
- return IoHolder.DEFAULT;
- }
- }
- static final class IoHolder {
- static final Scheduler DEFAULT = new IoScheduler();
- }
- static final class IoHolder {
- static final Scheduler DEFAULT = new IoScheduler();
- }
我们再回到 ObservableSubscribeOn 的 subscribeActual 方法, 在上一篇博客的时候已经讲解 Observable 和 Observer 之间是怎样实现订阅关系的, 这里就不再具体展开了.
接下来, 我们重点关注这一行代码
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
我们先来看一下 SubscribeTask 这个类, 他是 ObservableSubscribeOn 的一个非静态内部类, 可以看到 其实也比较简单, 他实现了 Runnable 接口, 并且持有 parent 引用.
- final class SubscribeTask implements Runnable {
- private final SubscribeOnObserver<T> parent;
- SubscribeTask(SubscribeOnObserver<T> parent) {
- this.parent = parent;
- }
- @Override
- public void run() {
- source.subscribe(parent);
- }
- }
然后在 run 方法中, 通过 source.subscribe(parent) 建立联系. 因而, 当我们的 SubscribeTask 的 run 方法运行在哪个线程, 相应的 observer 的 subscribe 方法就运行在哪个线程.
这里可能会有人有疑问, SubscribeTask 没有 source 属性, 它是怎么访问到 ObservableSubscribeOn 的属性的.
我们知道 java 中, 非静态内部类默认持有外部类的引用, 因而他可以正常访问外部类 ObservableSubscribeOn 的 source 属性.
接着, 我们再来看一下 scheduler.scheduleDirect 这个方法
- @NonNull
- public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
- final Worker w = createWorker();
- // 判断 run 是否为 null
- final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
- DisposeTask task = new DisposeTask(decoratedRun, w);
- w.schedule(task, delay, unit);
- return task;
- }
首先, 创建一个 Worker w
第二步, DisposeTask 将 decoratedRun 包装起来
第三步: w 去调度 task
这里我们以 NewThreadScheduler 为例, 来看看这个 Worker 到底是什么?
- public Worker createWorker() {
- return new NewThreadWorker(threadFactory);
- }
- public class NewThreadWorker extends Scheduler.Worker implements Disposable {
- private final ScheduledExecutorService executor;
- volatile boolean disposed;
- public NewThreadWorker(ThreadFactory threadFactory) {
- executor = SchedulerPoolFactory.create(threadFactory);
- }
- ---
- }
- public static ScheduledExecutorService create(ThreadFactory factory) {
- final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
- if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
- ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
- POOLS.put(e, exec);
- }
- return exec;
- }
从上面可以看到, 其实 worker 里面封装了 executor(线程池), 看到这里, 相信你也基本明白 Rxjava 线程切换的原理了, 其实很简单.
在 ObservableSubscribeOn subscribeActual 方法中, SubscribeTask 包装 parent(SubscribeOnObserver , 包装了 Observer),SubscribeTask 实现了 Runnable 接口, 在 run 方法里面调用了 source.subscribe(parent), 因而 run 方法所执行的线程将由 worker 决定. 这就是 下游决定上游 observable 执行线程的原理.
接下来我们再来看一下: DisposeTask
- static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
- final Runnable decoratedRun;
- final Worker w;
- Thread runner;
- DisposeTask(Runnable decoratedRun, Worker w) {
- this.decoratedRun = decoratedRun;
- this.w = w;
- }
- @Override
- public void run() {
- runner = Thread.currentThread();
- try {
- decoratedRun.run();
- } finally {
- dispose();
- runner = null;
- }
- }
- @Override
- public void dispose() {
- if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
- ((NewThreadWorker)w).shutdown();
- } else {
- w.dispose();
- }
- }
- @Override
- public boolean isDisposed() {
- return w.isDisposed();
- }
- @Override
- public Runnable getWrappedRunnable() {
- return this.decoratedRun;
- }
- }
- }
- // 将 新的 Disposable 设置给 parent , 方便取消订阅关系,
- //(因为我们对 Observer 进行相应的包装, 原来的 parent 的 Disposable 已经不能代表最新的 Disposable)
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
DisposeTask 实现了 Disposable,Runnable ,SchedulerRunnableIntrospection 接口, Disposable 接口主要是用来取消订阅关系的 Disposable.
- Observable#subscribeOn(Scheduler) 第一次有限原理
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- Log.i(TAG, "subscribe: getName=" +Thread.currentThread().getName());
- emitter.onNext("1");
- emitter.onNext("2");
- emitter.onNext("3");
- emitter.onComplete();
- }
- }) // 进行两次 subscribeOn
- .subscribeOn(Schedulers.io()).subscribeOn(Schedulers.computation()).subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e("TAG", "onSubscribe():");
- }
- @Override
- public void onNext(String s) {
- Log.e("TAG", "onNext():" + s);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- Log.e("TAG", "onComplete():");
- }
- });
- subscribe: getName=RxCachedThreadScheduler-1
如果将上述的 subscribeOn 的顺序置换
subscribeOn(Schedulers.computation()).subscribeOn(Schedulers.io())
那么将打印出
subscribe: getName=RxComputationThreadPool-1
为什么是第一次 Observable#subscribeOn(Scheduler) 才有效呢?
前面我们分析到, Observable#subscribeOn(Scheduler) 实际上是将 Observable#subscribe(Observer) 的操作放在了指定线程, 当我们调用 subcribe 的时候, 它的过程是从下往上的, 即下面的 Observable 调用上面的 Observanle.
所以对于我们上面的第一个例子, 他的调用流程是这样的: 第三个 Observable 调用 Observable#subscribe(Observer) 启动订阅, 在其内部会激活第二个 Observable 的 Observable#subscribe(Observer) 方法, 但是此时该方法外部被套入了一个 Schedulers.computation() 线程
于是这个订阅的过程就被运行在了该线程中. 用伪代码演示如下
- public class Observable {
- // 第二个 Observable
- Observable source;
- Observer observer;
- public Observable(Observable source, Observer observer) {
- this.source = source;
- this.observer = observer;
- }
- public void subscribe(Observer Observer) {
- new Thread("computation") {
- @Override
- public void run() {
- // 第二个 Observable 订阅
- source.subscribe(observer);
- }
- }
- }
- }
再往上走, 第二个 Observable 订阅内部会激活第一个 Observable 的 Observable#subscribe(Observer) 方法, 同样的, 该方法被套在了 Schedulers.io() 线程中, 用伪代码演示
- public class Observable {
- // 第一个 Observable
- Observable source;
- Observer observer;
- public Observable(Observable source, Observer observer) {
- this.source = source;
- this.observer = observer;
- }
- public void subscribe(Observer Observer) {
- new Thread("io") {
- @Override
- public void run() {
- // 第一个 Observable 订阅
- source.subscribe(observer);
- }
- }
- }
- }
此时到达第一个 Observable 了之后就要开始发射事件了, 此时的执行线程很明显是 io 线程. 还可以换成 Thread 伪代码来表示.
- new Thread("computation") {
- @Override
- public void run() {
- // 第二个 Observable.subscribe(Observer) 的实质
- // 就是切换线程, 效果类似如下
- new Thread("io") {
- @Override
- public void run() {
- // 第一个 Observable.subscribe(Observer) 的实质
- // 就是发射事件
- System.out.println("onNext(T)/onError(Throwable)/onComplete() 的执行线程是:" + Thread
- .currentThread().getName());
- }
- } .start();
- }
- } .start();
总结
用流程图描述如下:
image
参考博客:
友好 RxJava2.x 源码解析 (二) 线程切换 https://juejin.im/post/5a248206f265da432153ddbc#heading-9
下一篇我们将讲解到 observeOn(AndroidSchedulers.mainThread()) 的原理.
来源: http://www.jianshu.com/p/ddf9fc1f8142