上一篇介绍了线程的一些基础知识和工作这么久以后对于多线程部分的使用经验之路,这篇主要对 RxJava 线程控制部分进行分析。
说实话,近一年多一直在用 rxjava 进行项目架构的编写及封装及一些异步请求的处理等等。真的很好用,但本文只对其线程部分进行分析。如果你想学习 rxjava 的话,推荐您看一下如下几篇文档,也是一点一点学过来的,希望可以帮到您。
http://gank.io/post/560e15be2dca930e00da1083 扔物线大神,匠心之作
http://www.jianshu.com/u/c50b715ccaeb
http://www.jianshu.com/u/f690947ed5a6
网上找的对于 Scheduler 的 2.0 介绍都是写的扔物线大神对于 Scheduler 在 RxJava1.0 中的介绍,所以对于新操作符 single()是自己的理解什么的,不对的还请指出)。
在 RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景,他们分别为:
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
- //例如
- DisposableSubscriber sub = RetorfitUtil.getInstance().create().getData()
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- //类型转换 变成你想要的类型
- .onBackpressureBuffer()
- ...
下文会对这些 Scheduler 进行单独的介绍。但是在这之前,先看一下 Schedulers 这个类。
话不多说,请撸这段源码。在 subscribeOn 这个方法中,无论我们使用哪一个 Scheduler,都会首先走进这个类中。我们发现,在一开始的时候,他用一个静态代码块,初始化了五个 Scheduler 供我们使用。
- static final class SingleHolder {
- static final Scheduler DEFAULT = new SingleScheduler();
- }
- ....
- static final class NewThreadHolder {
- static final Scheduler DEFAULT = new NewThreadScheduler();
- }
- static {
- SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
- @Override
- public Scheduler call() throws Exception {
- return SingleHolder.DEFAULT;
- }
- });
- ....
- IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
- @Override
- public Scheduler call() throws Exception {
- return IoHolder.DEFAULT;
- }
- });
- TRAMPOLINE = TrampolineScheduler.instance();//默认Scheduler
- ....
- }
一旦我们调用这些方法,他就会把我们的任务放进对应的 Scheduler (线程池)中进行执行。这也就是线程切换的概念。
- ...
- public static Scheduler io() {
- return RxJavaPlugins.onIoScheduler(IO);
- }
- ...
- public static Scheduler single() {
- return RxJavaPlugins.onSingleScheduler(SINGLE);
- }
之前说这个 Scheduler(线程池)的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,那么他是如何做的呢?
首先,他们所有的 Thread 都是由 RxThreadFactory 创建,通过初始化独特的属于自己的字符串常量来确保不同的 Scheduler 在创建不同的线程的标志性。如下(其余同,就不介绍此部分了):
再者发现一个有意思的事情,rxjava 所有的线程都被设置为守护线程。如下图:也就是它的线程不会影响 JVM 的退出,关于守护线程和用户线程不明白的,请自行百度。
之前在使用的过程中,一直以为无数量上限的线程池的底层是 newCachedThreadPool 实现的,后来发现不然,在 IoScheduler 的 CachedWorkerPool 类中你会发现,它创建了一个创建一个定长线程池,并且大小是 1.
那么无数量限制呢?懵逼了!!后来发现,Schedulers.io() 所用的线程池(到这里不对了,准确的来说,应该叫的线程组,因为他是由一个一个的定长线程池组成),是一个由 ConcurrentLinkedQueue(顶层父类其实是 Collection)组成的多个 CachedWorkerPool,通过 CachedWorkerPool 的 get()方法每次从 Queue 中获取可用 worker 线程,来进行任务的操作。如下图:
当然,如果 expiringWorkerQueue 没有 worker 线程,则会单独为此线程创建一个 worker 线程
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。那么,在 NewThreadScheduler 它是怎么做的呢?如下图:图中第一个和第二个箭头说明不同的 Scheduler 在创建不同的线程的标志性。第三个箭头说明他每次创建一个线程就把这个线程初始化为 worker 线程,并执行操作。
计算所使用的 Scheduler。这个线程池由 newFixedThreadPool 直接实现。这个计算指的是 CPU 密集型计算。那为什么这个线程池适合执行 Cpu 密集型计算呢,因为它的个数等于 CPU 核数,能够最大效率的使用 CPU,提高效率。如图:
获取 CPU 核数:
创建固定大小的线程池:
Schedulers.computation() 模式下是用 RoundRobin(是一种算法,请自行百度)方式轮训获取 worker 线程,这就是为什么起名叫叫 EventLoop 吧,如下图。
在 EventLoopWorker 中,会调用 schedule 方法执行线程中的任务。
Schedulers.trampoline() 的意思是直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler,代替 RxJava1.0 的 immediate 调度器。Schedulers.trampoline() 所用的线程组,组合了一个 PriorityBlockingQueue,以提交事件的时间进行排序,依次执行任务,如下图。
这个单词你懂了就懂了。enqueue,不信你去查查!
Schedulers.single(): 拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。他只有一个 worker 线程,如图:
在 ScheduledWorker 中,会执行我们所有的任务。
在 CompositeDisposable 中,有一个 OpenHashSet 回来保存我们的任务,在 add 方法中,通过同步代码块,保证同一时间只有一个任务在执行。
接收一个 Executor,允许我们自定义 Scheduler。关于这方面,先不准备讲解,使用较少,如有需要请告知,我可以一起和你百度,共同实践哈哈!
还有一个任务,就是在 RxJavaPlugins 这个类中,初始化了很多的 handler,是为了 Funtion 中回调数据吗?还是什么?没找到相关的地方,求帮助!
还是那句话,希望得到大家的中肯的意见,让我认识自己的不足,一起学习,共同进步。这一篇就先到这里,分析的过程中,发现写的真是无法企及的高度。自残中……
来源: http://blog.csdn.net/say_from_wen/article/details/78774086