我十分推荐在项目中使用 RxJava 2,它通过使用可观察序列来编写异步程序和切换线程,提供了响应式的变成风格,相比 android 推出的 handler 和 asynctask,RxJava 2 可以让代码更加简洁明晰.
官方网站上对 RxJava 的定义是:
A library for composing asynchronous and event-based programs using observable sequences for the Java VM.
强调了 "异步",貌似跟多线程没有关系哦!的确是这样的,RxJava 2 默认并不支持多线程.好,用代码说服你:
Observable.just(1, 6, 9)
.doOnNext(object : Consumer<Int> {
@Throws(Exception::class)
override fun accept(integer: Int?) {
Log.i(TAG, "Emitting item on: " + currentThread().name + ", value: $integer")
}
})
.map(object : Function<Int, Int> {
override fun apply(p0: Int): Int {
Log.i(TAG,"Processing item on: " + currentThread().name + ", value: $p0")
return p0!! * 2
}
})
.subscribeWith(object : DisposableObserver<Int>() {
override fun onNext(@NonNull integer: Int) {
Log.i(TAG,"Consuming item on: " + currentThread().name + ", value: $integer")
}
override fun onError(@NonNull e: Throwable) {}
override fun onComplete() {}
})
/*
Emitting item on: main, value: 1
Processing item on: main, value: 1
Consuming item on: main, value: 2
Emitting item on: main, value: 6
Processing item on: main, value: 6
Consuming item on: main, value: 12
Emitting item on: main, value: 9
Processing item on: main, value: 9
Consuming item on: main, value: 18
*/
根据注释里的输出结果可以推断,RxJava 2 的操作是运行在当前的主线程的,是会被阻塞的.
你可能会问 doOnNext() 是作什么用的.它只是一个提供副作用的操作符,可以脱离 observable 链而执行一些不纯的操作.
小试牛刀:多线程
为了理解 RxJava 2 如何切换线程,你必须对 RxJava 2 的三个重要的操作符熟悉:Schedulers,observeOn 和 subscribeOn.
下面举一个多线程的例子:加入从网络上获取一个书籍 Book 的列表信息,并在 UI 线程先是出来,进了 RxJava 2 坑的同学很快都会写出下面的代码:
getBooks().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver<Int>() {
override fun onNext(@NonNull integer: Int) {
// You can access your Book objects here
}
override fun onError(@NonNull e: Throwable) {
// Handler errors here
}
override fun onComplete() {
// All your book objects have been fetched. Done!
}
})
有没有很简洁?getBooks() 方法执行网络操作并返回一个 Book 的列表.网络操作是一个很耗时的操作,我们使用 subscribeOn() 方法使网络操作在 Schedulers.io() 线程中进行,然后使用 observeOn() 方法指定消费者在 Schedulers.mainThread() 主线程执行操作.
拥抱调度器:Schedulers
你可以把 Schedulers 认为是一个执行不同任务的线程池.如果你想在一个线程中执行一个任务,那么需要挑选一个合适的调度器 Schedulers.RxJava 2 提供了几种不同类型的调度器 Schedulers,如果你挑选了不合适的 Schedulers,那么你的代码就不是最优的,下面看下这几个调度器 Schedulers:
Schedulers.io.通常用来执行一些非即 CPU 密集型的操作,比如读写文件,网络操作,读写数据库等等.这个调度器没有上限,为了满足需要,它的线程池的数量可以增加.
Schedulers.computation().这个调度器常用来执行 CPU 密集型的操作,比如大量数据集的计算,图片处理等等.它的线程池的数量是有线的.由于此调度器只适合于 CPU 密集型任务,所以我们希望限制线程的数量,这样它们就不会在 CPU 时间之间相互争斗,从而使自己饿死.
Schedulers.newThread().每次使用这个调度器的时候,都会完全新建一个线程来执行分配的任务.它不会使用线程池,当然也不会享受线程池带来的好处.线程的创建和销毁都很昂贵,因此您应该非常小心,不要滥用过多的线程生成导致严重的系统减速和内存错误.理想情况下,您将很少使用此调度器,主要用于启动一个完全独立的线程来执行长时间运行,隔离的任务.
Schedulers.single().这个是 RxJava 2 新引进的调度器,在 RxJava 1 中不存在的.它是一个单独的线程,使用顺序的方式执行任务.如果你在后台有很多任务要执行,但是一次只能执行一个,这种情况下使用这个调度器是最合适的.
Schedulers.from(Executor executor).这个方法可以让你用自己的 Executor 来创建自定义的 Scheduler. 假设,你想限制并行网络操作在你的应用程序被调用的数量,你可以创建一个定制的调度器并固定线程池大小,Scheduler.from(Executors.newFixedThreadPool(n)),并在代码中网络相关的 Observables 使用它.
AndroidSchedulers.mainThread().这是一个特殊的调度器,在标准的 Rxjava 库中找不到它,它存在于 RxAndroid 库.它专门为 android 程序设计,在 UI 主线程执行 UI 相关的操作.默认情况下,它会在与应用程序主线程关联的 looper 中执行队列任务,但是还有其他的特殊情况,允许我们使用像 AndroidSchedulers.from(Looper looper) 这样的 api 来使用任何 Looper.
注意: 在使用由无边界限制的线程池 (如 Schedulers.io()) 支持的调度程序时要小心,因为总是存在无限增长线程池和大量线程泛滥的风险.
理解 observeOn 和 subscribeOn
相信你对 Rxjava 提供的几种不同的调度器已经有所理解,那么就要理解下 observeOn 和 subscribeOn 这两个重要的操作符了.
subscribeOn
这个操作符指定了上游的源观察者释放元素操作所在的线程.如果有一串观察者,那么源观察者总是位于顶部,在这里元素被生成.在前面章节中的第一个代码段中我们没有使用 observeOn,你清楚地看到释放操作是在 UI 主线程中进行的.如果 observeOn 指定了 Schedulers.computation() 调度器,那么上游的操作都会在 comoutation 线程执行,如下面的代码:
Observable.just(2, 3).doOnNext {
Log.i(TAG, "Emitting item " + it + " on: " + currentThread().getName())
}.subscribeOn(Schedulers.computation()).map {
Log.i(TAG, "Mapping item " + it + " on: " + currentThread().getName()) it * it
}.filter {
Log.i(TAG, "Filtering item " + it + " on: " + currentThread().getName()) it % 2 == 0
}.subscribe {
Log.i(TAG, "Consuming item " + it + " on: " + currentThread().getName())
}
/*
Emitting item 2 on: RxComputationThreadPool-1
Mapping item 2 on: RxComputationThreadPool-1
Filtering item 4 on: RxComputationThreadPool-1
Consuming item 4 on: RxComputationThreadPool-1
Emitting item 3 on: RxComputationThreadPool-1
Mapping item 3 on: RxComputationThreadPool-1
Filtering item 9 on: RxComputationThreadPool-1
* */
这段代码中,我们没有使用完整的 DisposableSubscriber,因为我们不需要 onError() 和 onComplete(),只处理 onNext() 一个单独的消费者就足够了.
在观察链中,observeOn 的位置没有任何影响,如果你很好奇,你可以自己试下把上面代码中的 observeOn 方法放到末尾(但是必须要在消费者即 subscribe 的前面).
另一个重要的点是你不能在观察链中多次使用 observeOn,如果你那样作的话,其实只有第一个 observeOn 即最接近源 Observable 的才会生效.
Observable.just(1, 2, 3, 4, 5, 6).subscribeOn(Schedulers.io()).subscribeOn(Schedulers.computation()).subscribeOn(Schedulers.newThread()).doOnNext {
it - >Log.i(TAG, "Emitting item " + it + " on: " + currentThread().getName())
}.subscribe {
it - >Log.i(TAG, "Consuming item " + it + " on: " + currentThread().getName())
}
/*
Emitting item 1 on: RxCachedThreadScheduler-1
Consuming item 1 on: RxCachedThreadScheduler-1
Emitting item 2 on: RxCachedThreadScheduler-1
Consuming item 2 on: RxCachedThreadScheduler-1
Emitting item 3 on: RxCachedThreadScheduler-1
Consuming item 3 on: RxCachedThreadScheduler-1
Emitting item 4 on: RxCachedThreadScheduler-1
Consuming item 4 on: RxCachedThreadScheduler-1
Emitting item 5 on: RxCachedThreadScheduler-1
Consuming item 5 on: RxCachedThreadScheduler-1
Emitting item 6 on: RxCachedThreadScheduler-1
Consuming item 6 on: RxCachedThreadScheduler-1
* */
通过看注释,你已经知道上面代码中只有 subscribeOn(Schedulers.io()) 是生效的,其余无效.为什么呢?我简单暴力地说:Rxjava 是链式操作,自上而下,下游的调度器是是上游订阅者指定的,那么要找到这个调度器就要自下而上回溯,自然就找到了距离源 Observable 最近的 subscribeOn 指定的调度器才真正起作用.不知道你有没有搞懂.
observeOn()
observeOn 可以很方便地切换线程,指定了消费者所在的线程.
Observable.just(2, 3)
.doOnNext { Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map {
Log.i(TAG,"Mapping item " + it + " on: " + currentThread().getName())
it * it
}
.observeOn(Schedulers.newThread())
.filter {
Log.i(TAG,"Filtering item " + it + " on: " + currentThread().getName())
it % 2 == 0
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 2 on: RxCachedThreadScheduler-1
Emitting item 3 on: RxCachedThreadScheduler-1
Mapping item 2 on: RxComputationThreadPool-1
Mapping item 3 on: RxComputationThreadPool-1
Filtering item 4 on: RxNewThreadScheduler-1
Filtering item 9 on: RxNewThreadScheduler-1
Consuming item 4 on: main
* */
通过上面的代码,你应该知道可以多次调用 observeOn 操作符,指定后面的操作所在的线程.这段代码还有个不同的地方是 Emitting,Mapping 和 Filtering 使用不同的调度器,那么总是先执行完 Emitting,再执行完 Mapping,然后执行完 Filtering,最后执行 Consuming,而前面的代码中 Emitting,Mapping 和 Filtering 使用相同的调度器,那么总是执行完一个完整的事件(即 Emitting,Mapping,Filtering 和 Consuming),再执行下一个完整的事件.
Rxjava 有很多的东西可讲的,我也在不断学习中.希望多多交流,有错误的地方也希望留言指正,我的联系方式: owl@violetpersimmon.com
来源: http://www.jianshu.com/p/7e3a839a7d30