这是一系列文章的第一篇, 这个系列没有什么非常明确的主题, 可以看做是日常开发和协作里悟出来的一些技术相关的事情 -- 或者说是技术方面的随笔.
这一篇是关于 RxJava .
曾经看到这么一篇文章, 称赞 Kotlin Coroutine 大好, RxJava 可以直接丢弃了; 也曾经看到一篇文章(事实上正是我刚开始学习 RxJava 的时候看的), 称 RxJava 的本质就是异步.
如果把这两篇文章放在一起看, 那么似乎 "合理":Kotlin Coroutine 解决了 JVM 世界里过去的 Callback 形式的异步回调, 而 RxJava 的本质就是处理异步, 那么 RxJava 自然不需要了, 用 Kotlin Coroutine 就行了.
上面这段话的表述是有问题的.
这引出我下面要说的一部分:
RxJava 不只是异步
先放一段大家熟悉的代码:
- getUser("id").subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(object : SingleObserver<UnsplashUser> {
- override fun onSuccess(user: UnsplashUser) {
- nameTextView.text = user.name
- }
- override fun onSubscribe(d: Disposable) {
- // do on subscribe
- }
- override fun onError(e: Throwable) {
- e.printStackTrace()
- }
- })
getUser() 返回一个 Single<UnsplashUser> , 里面可能是网络请求, 又或者是查询数据库, 但这不重要, 重要的是这两句话:
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
第一句是把订阅的操作放到 IO 线程里执行, 然后在主线程里观察: 因此我们能在 onSuccess 方法里给 TextView 的 text 属性赋值.
相信上述代码大家一定在项目里用过, 甚至把中间的切换线程的操作放到一个 Transformer<T> 里, 实现自己的 Observer 简化一些通用的处理, 理所当然地把 RxJava 等同于异步.
但是 RxJava 真的非要异步吗? 试想一下, 如果在调用的时候, 完全没有 subscribeOn 和 observeOn , 那么这样算什么呢?
你可能会回答:
这情况就不该用 RxJava 了.
这可以说对, 同时也可以说不对.
首先, RxJava 是一个模式, 一个建立在上游发射数据, 然后被下游观察的观察者模式, 在上游发出数据后, 你可以做一些数据转换(Map), 可以接到另一个流(FlatMap), 可以让他经过你自定义的转换器(Transformer), 当然也可以指定订阅和观察的线程(SubscribeOn & ObserveOn), 只使用其中一部分也是没问题.
一个合适的例子是 RxBus, 跟 EventBus 不一样的是, 它是基于 RxJava 实现的, 是站在 RxJava 的肩膀上的. 当然它可以切换线程, 但这个不重要: 这是一个上游发射数据, 下游接收数据然后做对应处理的好例子.
Rx 的世界, 不只有 Observable
自 RxJava2 开始, 就支持了 Flowable , Single , Maybe , Completable 这几个类型, 跟 Observable 相比, 他们表示的意思更明确:
Flowable: 支持背压的 Observable
Single: 成功发射一个 item 或者失败
Maybe: 可能发射一个 item, 或者直接完成, 或者失败
Completable: 完成或者失败
为什么需要知道这个? 我真的需要知道是该用 Observable 还是 Single 吗?
如果所有代码都是你自己写的, 那么你可能觉得区分与否都没所谓: 一个获取用户信息的网络请求 API, 返回 Observable / Flowable / Single 都是一样的, 对于前两者, 你只会关心 onNext (而且只期望发生一次), 对于后者, 你会只关心 onSuccess , 同时错误处理都在 onError 里处理. 所以, 为什么不直接使用 Single 呢?
如果你是写给别人服务的 API, 那么区分这几种类型还是很有必要的: 通过返回的类型能简单地告知调用方这个流应该是什么类型, 不明确的类型 (比如一律是 Observable) 可能让调用方在 Observer 处理事件的时候里做一些无用功.
不要跟 Callback 混用
看一下这样的代码:
- interface Callback {
- fun onSuccess(value: Int)
- fun onError(t: Throwable)
- }
- private fun foo(callback: Callback?) {
- Single.just(0)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(object : SingleObserver<Int> {
- override fun onSuccess(t: Int) {
- callback?.onSuccess(t)
- }
- override fun onSubscribe(d: Disposable) {
- }
- override fun onError(e: Throwable) {
- callback?.onError(e)
- }
- })
- }
- fun main() {
- foo(object : Callback {
- override fun onSuccess(value: Int) {
- }
- override fun onError(t: Throwable) {
- }
- })
- }
万不得已不要跟 Callback 混用. 每调用一次就多产生一个对象(这个可能是最无关紧要的理由, 毕竟使用 Rx 意味着产生的对象就比较多了), 变成 Callback 风格对调用方来说可能 并没有什么好处 ; 相反, 如果返回的类型是 Rx 的, 那么调用方在调用的时候就可以考虑使用 flatMap 的形式跟上游连接起来.
同时, 正如第一点说的, Rx 的本质并不是异步, 因此同样地 万不得已 不要在方法里面指定调度器(也就是不要使用 subscribeOn). 如果你有观察 Rx 内部的 public 方法, 你可以看到基本上都有这么一段注释:
- * <dl>
- * <dt><b>Scheduler:</b></dt>
- * <dd>{
- @code defer
- } does not operate by default on a particular {
- @link Scheduler
- }.</dd>
- * </dl>
这表示内部没有指定调度器执行.
上面之所以说 万不得已 , 是因为有一些 API 需要在特定的线程上处理, 比如 delay 方法默认在 Computation 调度器执行的:
- * <dt><b>Scheduler:</b></dt>
- * <dd>{
- @code interval
- } operates by default on the {
- @code computation
- } {
- @link Scheduler
- }.</dd>
- * </dl>
所以对于写 SDK 的同学来说, 请尽可能地把调度器的指定权交给调用方, 这样既能减少测试的粒度, 又能让 API 设计得更灵活.
一个例子:
- fun main() {
- val uri = Uri.parse("")
- decodeBitmap(uri)
- .flatMap {
- saveToInternalStorage(it)
- }
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(object : SingleObserver<File> {
- override fun onSuccess(t: File) {
- }
- override fun onSubscribe(d: Disposable) {
- }
- override fun onError(e: Throwable) {
- }
- })
- }
- private fun saveToInternalStorage(bm: Bitmap): Single<File> {
- return Single.create { e ->
- val file = File(getExternalFilesDir(Environment.DIRECTORY_PICTURES), "${System.nanoTime()}.jpg")
- val fos = FileOutputStream(file)
- try {
- fos.use {
- bm.compress(Bitmap.CompressFormat.JPEG, 100, it)
- }
- e.onSuccess(file)
- } catch (t: Throwable) {
- e.onError(t)
- }
- }
- }
- private fun decodeBitmap(uri: Uri): Single<Bitmap> {
- return Single.create<Bitmap> { e ->
- val fd = contentResolver.openFileDescriptor(uri, "r")?.fileDescriptor ?: kotlin.run {
- e.onError(RuntimeException("Failed to open fd"))
- return@create
- }
- val bm = BitmapFactory.decodeFileDescriptor(fd) ?: kotlin.run {
- e.onError(RuntimeException("Failed to decode bitmap"))
- return@create
- }
- e.onSuccess(bm)
- }
- }
善用「起死回生」
好吧这里并不是真的什么「起死回生」.
有几个方法你可能会忽略, 拿 Single 为例子:
这几个方法分别可以在遇到错误的时候(前提是 onError 能执行), 接上一个 Single 流, 或者直接发射一个 item. 如果你的 onError 和 onSuccess 里的处理很类似的话, 那么这些方法将可能帮助你简化 Observer 里处理的代码.
同样地, Observable 和其他也有类似的代码, 对于 Observable 来说, 可以忽略其中一些 item 的错误, 然后转化为正确的或者经过特殊装饰的 item 发射出去.
当然, onErrorxxx 里面也是可以抛异常的, 如果在里面抛异常了, 那么依然能在 onError 里处理这个异常 -- 略有不同的是, 这个异常是 CompositeException , 你需要 getCause() 才能获得真正的异常.
来源: http://www.tuicool.com/articles/I7FJfqu