读了大神的博客,为了学习,跪着看完再整理了出来,以便加深印象。
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
一个运行在 JVM 上的库,通过可观测的序列来组成异步的,基于事件的程序。
Observable 和 Observer 通过
方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
- subscribe()
与传统观察者模式不同,RxJAva 的事件回调方法除了普通事件
(相当于
- onNext()
) 之外,还定义了两个特殊的事件:
- onClick()/onEvent()
和
- onCompleted()
。
- onError()
: 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的
- onCompleted()
发出时,需要触发
- onNext()
方法作为标志。
- onCompleted()
: 事件队列异常。在事件处理过程中出异常时,
- onError()
会被触发,同时队列自动终止,不允许再有事件发出。
- onError()
和
- onCompleted()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,
- onError()
和
- onCompleted()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
- onError()
基于以上的概念,RxJava 的基本实现主要有三点:
- Observer observer = newObserver() {
- @Override
- public void onCompleted() {
- Log.d(TAG,"Completed!");
- }@Override
- public void onError(Throwable e) {
- Log.d(TAG,"Error!");
- }@Override
- public void onNext(String s) {
- Log.d(TAG,"Item:"+ s);
- }
- };
除了 Observer 接口外,RxJava 还内置了一个实现了 Observer 的抽象类: Subscriber。Subscriber 堆 Observer 接口进行了一些扩展,但是它们的基本使用方式是完全一样的。不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。它们区别对于使用者来说主要有两点:
: 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。需要注意的是:如果准备工作的线程有要求,
- onStart()
就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要再指定的线程来你做准备工作,可以适用
- onStart()
方法。
- doOnSubscribe()
: 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。避免内存泄漏。
- unsubscribe()
RxJava 适用
方法来创建一个 Observable,并为它定义事件触发规则:
- create()
- Observable observable = Observable.create(newObservable.OnSubscribe() {
- @Override
- public void call(SubscribersuperString> subscriber) {
- subscriber.onNext("I'm a");
- subscriber.onNext("I'm b");
- subscriber.onNext("I'm c");
- subscriber.onCompleted();
- }
- });
可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的
方法会自动被调用。这样,由被观察者调用了观察者的回掉方法,就能实现由被观察者向观察者的事件传递,即观察者模式。
- call()
方法是 RxJava 最基本的创造事件序列的方法。基于这个方法,RxJava 还提供了以下方法用来快捷创建事件队列,例如:
- create()
: 将传入的参数一次发送出来。
- just(T...)
- Observable observable = Observable.just("I'm a", "I'm b", "I'm c");
- //会依次调用
- // onNext("I'm a");
- // onNext("I'm ");
- // onNext("I'm c");
- // onCompleted();
/
- from(T[])
: 将传入的数组或者 Iterable 拆分成具体对象后,依次发送出来。
- from(Iterable<? extends T>)
- String[] words = {
- "I'm a",
- "I'm b",
- "I'm c"
- };
- Observable observable = Observable.from(words);
创建了观察者和被观察者之后,再用
方法将它们联合起来,整条链子就可以工作了。
- subscribe()
- observable.subscribe(observer);
的内部是显示这样的 (仅核心代码):
- Observable.subscribe(Subscriber)
- // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
- // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
- publicSubscriptionsubscribe(Subscriber subscriber) {
- subscriber.onStart();
- onSubscribe.call(subscriber);//在RxJava中,Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。
- returnsubscriber;//作为Subscription返回。这是为了方便unsubscribe();}
除了
和
- subscribe(Observer)
,
- subscribe(Subscriber)
还支持不完整定义的回掉,RxJava 会自动根据定义创建出 Subscriber。形式如下:
- subscribe()
- //Action1也是一个接口,,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。Action1 onNextAction = newAction1() {
- @Override
- public void call(String s) {
- Log.d(TAG,s);
- }
- };
- Action1 onErrorAction = newAction1() {
- @Override
- public void call(Throwable throwable) {
- }
- };//Action0使RxJava的一个接口,它只有一个方法call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。Action0 onCompletedAction =newAction0() {@Override
- public void call() {
- Log.d(TAG,"completed");
- }
- };
- //自动创建 Subscriber,并使用 onNextAction 来定义 onNext() observable.subscribe(onNextAction);//自动创建 Subscriber,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()observable.subscribe(onNextAction,onErrorAction);//自动创建 Subscriber,并使用 onNextAction 、 onErrorAction 和 onCompletedAction 来定义
- // onNext() 、 onError() 和 onCompleted()observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
然而,上面都是同步的,要实现异步,则需要用到 RxJava 的另一个概念:Scheduler。
在不指定线程的情况下,RxJava 遵循的是线程不变的原则,即在那个线程调用
, 就在那个线程生产时间;在那个线程生产事件,就在那个线程消费事件。如果需要切换线程,就需要用到 Scheduler(调度器)。
- subscribe()
在 RxJava 中,Scheduler———调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么线程。RxJava 已经内置了几个 Scheduler,它们已经适合大多数的使用场景:
: 直接在当前线程运行,相当于不指定线程。这也是默认的。
- Scheduler.immediate()
: 总是启用新线程,并在新线程执行操作。
- Scheduler.newThread()
:IO 操作 (读写文件、读写数据库、网络信息交互等) 所使用的 Scheduler。行为模式和
- Scheduler.io()
差不多,区别在于
- newThread()
的内部实现是使用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下
- io()
比
- io()
更有效率。不要把计算工作放在
- newThread()
中,可以避免创建不必要的线程。
- io()
: 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 IO 等操作限制性能的操作,例如图形的计算。这个 Scheduler 所使用的固定的线程池,大小为 CPU 核数。不要把 IO 操作放在
- Scheduler.computation()
中,否则 IO 操作的等待事件会浪费 CPU。
- computation()
, 它指定的操作将在 Android 主线程运行。
- AndroidSchedulers.mainThread()
有了这几个 Scheduler,就可以使用
和
- subscribeOn()
两个方法来对线程进行控制了。
- observeOn()
: 指定
- subscribeOn()
所发生的线程,即
- subscribe()
被激活时所处的线程,或者叫做事件产生的线程。
- Observable.OnSubscribe()
: 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
- observeOn()
上代码:
- Observable.just(1,2,3,4)
- .subscribeOn(Schedulers.io())// 指定 subscribe() 发生在 IO 线程,意思是被创建的事件内容1,2,3,4会在io线程发出。.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程,意思是subscriber数字的打印将发生在主线程。.subscribe(newAction1() {
- @Override
- public void call(Integer number) {
- Log.d(tag,"number:"+ number);
- }
- });
上面这段代码中,适用于多数的后台线程读取数据,主线程显示的策略。
放在后面,它的原理是以变换的原理作为基础的。
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
首先看一个
的例子:
- map()
- observable.just("images/logo.png")
- .map(newFunc1() {
- @Override
- publicBitmapcall(String s) {returngetBitmapFromPath(s);
- }
- })
- .subscribe(newAction1() {@Override
- public void call(Bitmap bitmap) {
- showBitmap(bitmap);
- }
- });
这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个借口,用于包装含有一个参数的方法。Func1 和 Action1 的区别在于,Func1 包装的是有返回值的方法。另外,和 ActionX 一样,FuncX 也有多个,用于不同参数个数的方法。
可以看到,
方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过
- map()
方法后,事件的参数类型也有 String 转换为了 Bitmap。这种直接变换对象并返回的,是最常见的也是最容易理解的变换。不过 RxJava 的比那还远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。列举几个常用的变换:
- map()
: 如上,事件对象的直接变换。
- map()
: 这是一个很有用但是非常难理解的变换。首先假设由这么一种需求:假设有一个数据结构【学生】,现在需要打印出一组学生的名字。实现方式非常简单:
- flatMap()
很简单,再假设: 如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:
- Student[] students = ...;
- Subscriber subscriber = newSubscriber() {
- @Override
- public void onNext(String s) {
- Log.d(tag,s);
- }
- ...
- };
- Observable.from(students)
- .map(newFunc1() {
- @Override
- publicStringcall(Student student) {returnstudent.getName();
- }
- })
- .subscribe(subscriber);
依然很简单,那么如果我不想在 Subscriber 中使用 for 循环, 而是希望 Subscriber 中直接传入单个的 Course 对象呢 (这对于代码复用很重要)。用
- Student[] students = ...;
- Subscriber subscriber = newSubscriber() {
- @Override
- public void onNext(Student student) {
- List courses = student.getCourses();
- for(inti =0; i < courses.size(); i++) {
- Course course = courses.get(i);
- Log.d(tag, course.getName());
- }
- }
- ...
- };
- Observable.from(students)
- .subscribe(subscriber);
- }
显然是不行的,因为
- map()
是一对一的转换。那怎么才能把一个 Student 转化成多个 Course 呢? 这时候,就要用到
- map()
了:
- flatMap()
从上面的代码可以看出,
- Student[] students = ...;
- Subscriber subscriber = newSubscriber() {
- @Override
- public void onCompleted() {
- }@Override
- public void onError(Throwable e) {
- }@Override
- public void onNext(Course course) {
- Log.d(TAG,course.getName());
- }
- };
- Observable.from(students)
- .flatMap(newFunc1>() {
- @Override
- publicObservable call(Student student) {returnObservable.from(student.getCourses());
- }
- })
- .subscribe(subscriber);
和
- flatMap()
有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意的,和
- map()
不同的是,
- map()
中返回的是个 Observable 对象,并且这个 Observable 对象并不是北直接发送到 Subscriber 的回调方法中。
- flatMap()
的原理是这样的:
- flatMap()
所谓的 flat。
- flatMap()
示意图:
- flatMap()
扩展:由于可以在嵌套的 Observable 中添加异步代码,
也常用语嵌套的异步操作,例如嵌套的网络请求。示例代码 (Retrofit+RxJava):
- flatMap()
- java
- networkClient.token()
- .flatMap(new Func1
- @Override
- public Observable call(String token) {
- // 返回 Observable,在订阅时请求消息列表,并在响应后发送请求到的消息列表
- return networkClient.messages();
- }
- })
- .subscribe(new Action1(){
- @Override
- public void call(Messages messages) {
- // 处理显示消息列表
- showMessages(messages);
- }
- });
传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过
, 可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
- flatMap()
: 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器:
- throttleFirst()
。不必再但因用户手都点来两个重复的界面了。 RxJava 还提供很多便捷的方法来实现事件序列的变换。就不一一列举了。
- RxJava.clickEvents(button) //RxBinding代码。 .throttleFirst(500,Timeunit.MILLISECONDS)//设置防抖间隔为500ms .subscribe(subscriber)
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 内部,它们是基于同一个基础的变换方法
。首先看一下
- lift(Operator)
的内部实现 (仅核心代码):
- lift()
- //注意:这里不是lift()的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
- public Observable lift(OperatorsuperT> operator){returnObservable.create(newonSubscribe(){
- @Override
- public void call(Subscriber subscriber){
- Subscriber newSubscriber = operator.call(subscriber);
- newSubscriber.onStart();
- onSubscribe.call(newSubscriber);
- }
- });
- }
这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新的 Observable 所用的参数 OnSubscribe 的回调方法
的实现竟然和前面说过的
- call()
一样!然而它们并不一样哦,不一样的地方关键就在于第二行
- Observable.subscribe()
中的 OnSubscribe** 所指代的对象不同 **
- onSubscribe.call(subscriber)
中这句话的 OnSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是
- subscribe()
之后的情况就复杂了点。
- lift()
时:
- lift()
创建了一个 Observable 后,加上之前原始的 Observable,已经有两个 Observable 了;
- lift()
后的 Observable 的
- lift()
的时候,使用的是
- subscribe()
所返回的新的 Observable,于是它触发的
- lift()
, 也是用的新 OnSubscribe,即在
- onSubscribe.call(subscriber)
中生成的那个 OnSubscribe;
- lift()
方法中的 onSubscribe,就是指的原始 Observable 中的原始 OnSubscribe,在这个
- call()
方法里,新 OnSubscribe 利用
- call()
生成了一个新的 Subscriber( Operator 就是在这里,通过自己的
- operator.call(subscriber)
方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的【变换】代码以实现变换),然后利用这个新的 Subscriber 向原始 Observable 进行订阅。
- call()
过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
- lift()
精简掉细节的话,也可以这么说: 在 Observable 执行了
方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理之后发送给 Subscriber。
- lift(Operator)
如图:
两次和多次的
同理,如下图:
- lift()
举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:
- Observable.lift(newObservable.Operator(){
- @Override
- publicSubscribersuperInteger>call(finalSubscribersuperString> subscriber){//将事件序列中的Integer对象转换为String对象
- return newSubscriber{
- @Override
- public void onNext(Integer integer){
- subscriber.onNext(""+ integer);
- }@Override
- public void onCompleted(){
- subscriber.onCompleted();
- }@Override
- public void onError(Throwable e){
- subscriber.onError(e);
- }
- };
- }
- });
讲述
的原理只是为了更好的了解 RxJAva,从而可以更好的使用它。然而不管是否了解
- lift()
的原理,RxJava 都不建议开发者自定义 Operator 来直接使用
- lift()
, 而是建议使用已有的
- lift()
包装方法 (如
- lift()
- map()
等) 进行组合来实现需求,因为直接使用
- flatMap()
非常容易发生一些难以发现的错误。
- lift()
除了
之外,Observable 还有一个变换方法叫做
- lift()
。它和
- compose(Transformer)
的区别在于,
- lift()
是针对事件项和事件序列的,而
- lift()
是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable,并且他们都需要应用一组相同的
- compose()
变换。可以这么写:
- lift()
- observable1
- .lift1()
- .lift2()
- .lift3()
- .lift4()
- .subscribe(subscriber1);
- observable2
- .lift1()
- .lift2()
- .lift3()
- .lift4()
- .subscribe(subscriber2);
- observable3
- .lift1()
- .lift2()
- .lift3()
- .lift4()
- .subscribe(subscriber3);
- observable4
- .lift1()
- .lift2()
- .lift3()
- .lift4()
- .subscribe(subscriber4);
这样太不软件工程了,于是你改成了这样:
- privateObservableliftAll(Observable observable){returnobservable
- .lift1()
- .lift2()
- .lift3()
- .lift4();
- }
- ...
- liftAll(observable1).subscribe(subscriber1);
- liftAll(observable2).subscribe(subscriber2);
- liftAll(observable3).subscribe(subscriber3);
- liftAll(observable4).subscribe(subscriber4);
可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observable 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用
来解决了:
- compose()
- public class LiftAllTransfomer implements Observable.Transformer<Integer,String>{
- @Override
- publicObservable call(Observable observable){
- returnobservable
- .lift1()
- .lift2()
- .lift3()
- .lift4();
- }
- }
- ...
- Transformer liftAll =newLiftAllTransformer();
- observable1.compose(liftAll).subscribe(subscriber1);
- observable2.compose(liftAll).subscribe(subscriber2);
- observable3.compose(liftAll).subscribe(subscriber3);
- observable4.compose(liftAll).subscribe(subscriber4);
像上面这样,使用
方法,Observable 可以利用传入的 Transformer 对象的
- compose()
方法直接对自身进行处理,也就不必被包在方法里面了。
- call()
除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。
前面说过了,可以利用
结合
- subscribeOn()
来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解
- observeOn()
- map()
等变换方法后,有些人就问了: 能不能多切换几次线程
- flatMap()
答案是:能。因为
指定的是 Subscriber 的线程,而这个 Subscriber 并不是 (严格来说应该是【不一定是】,但这里不妨理解为【不是】)
- observeOn()
参数中的 Subscriber,而是
- subscribe()
执行时的当前 Observable 所对应的 Subscriber,即它的直接下级 Subscriber。换句话说,
- observeOn()
指定的是它之后的操作所在的线程。因此如果有多次线程切换的需求,只要在每个想要切换线程的位置调用依次
- observeOn()
即可。上代码:
- observeOn()
- Observable.just(1,2,3,4)//IO线程,由subscribeOn()指定.subscribeOn(Schedulers.io())
- .observeOn(Schedulers.newThread())
- .map(mapOperator)//新线程,由observeOn()指定.observeOn(Schedulers.io())
- .map(mapOperator2)//IO线程,由observeOn()指定.observeOn(AndroidSchedulers.mainThread)
- .subscribe(subscriber);//Android组合县城,由observeOn()指定
如上,通过
的多次调用,程序实现了线程的多次切换。
- observeOn()
不过,不同于
,
- observeOn()
的位置放在那里都可以,但它是只能调用一次的。
- subscribeOn()
其实,
和
- subscribeOn()
的内部实现,也是用的
- observeOn()
。具体看图 (不同颜色的箭头表示不同的线程):
- lift()
原理图:
- subscribeOn()
原理图:
- observeOn()
从图中可以看出,
和
- subscribeOn()
都做了线程切换的工作 (图中的"schedule…"部位)。不同的是,
- observeOn()
的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此
- subscribeOn()
的线程控制可以从事件发出的开端就造成影响; 而
- subscribeOn()
的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此
- observeOn()
控制的是它后面的线程。
- observeOn()
最后,用一张图来解释当多个
和
- subscribeOn()
混合使用时,线程调度时怎么发生的 (由于图中对象较多,相对于上面的图对结构做了一些简化调整):
- observeOn()
图中共有 5 处含有对事件的操作。由图中可以看出,1 和 2 两处受第一个
影响,运行在红色线程; 3 和 4 受第一个
- subscribeOn()
的影响,运行在绿色线程; 5 处受第二个
- observeOn()
的影响,运行在紫色线程; 而第二个
- onserveOn()
由于在通知过程就被第一个
- subscribeOn()
截断,因此对整个流程并没有任何影响。所以:当使用了多个
- subscribeOn()
的时候,只有第一个起作用。
- subscribeOn()
然而,虽然超过一个的
对事件处理的流程没有影响,但在流程之前却是可以利用的。
- subscribeOn()
在前面说到 Subscriber 的时候,提到过 Subscriber 的
可以用作流程开始前的初始化。然而
- onStart()
由于在
- onStart()
发生时就被调用了,因此不能指定线程,而是只能执行在
- subscribe()
被调用的线程。这就导致如果
- subscribe()
中含有对线程有要求的代码 (例如在界面上显示一个 ProgressBar,这必须在主线程中执行),将会有线程非法的风险,因为有时你无法预测
- onStart()
将会在什么线程执行。
- subscribe()
而与
相对应的,有一个方法
- subscriber.onStart()
。它和
- Observable.doOnSubscribe()
同样是在
- Subscriber.onStart()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下,
- subscribe()
执行在
- doOnSubscribe()
发生的线程; 而如果在
- subscribe()
之后有
- doOnSubscribe()
的话,它将执行在离它最近的
- subscribeOn()
所指定的线程。
- subscribeOn()
示例代码:
- Observable.create(onSubscribe)
- .subscribeOn(Schedulers.io())
- .doOnSubscribe(newAction0() {@Override
- public void call() {
- progressBar.setVisibility(View.VISIBLE);// 需要在主线程执行}
- })
- .subscribeOn(AndroidSchedulers.mainThread())// 指定主线程.observeOn(AndroidSchedulers.mainThread())
- .subscribe(subscriber);
如上,在
的后面跟一个
- doOnSubscribe()
, 就能指定准备工作的线程了。
- subscribeOn()
Retrofit 是 Square 的一个著名的网络请求库。
Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式的 API。下面用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。
以获取一个 User 对象的接口作为例子。使用 Retrofit 的传统 API,可以用这样的方式定义请求:
- @GET("/user")public void getUser(@Query("userId") String userId,Callback callback);
在程序的构建过程中,Retrofit 会自动把方法实现并生成代码,然后开发者可以利用下面的方法来获取特定用户并处理响应:
- getUser(userId,newCallback(){
- @Override
- public void success(User user){
- userView.setUser(user);
- }@Override
- public void failure(RetrofitError error){//error handing...
- }
- });
而使用 RxJava 形式的 API,定义同样的请求是这样的:
- @GET("/user")publicObservable getUser(@Query("userId") String userId);
使用的时候是这样的:
- getUser(userId)
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(newObserver(){
- @Override
- public void onNext(User user) {
- userView.setUser(user);
- }@Override
- public void onCompleted() {
- }@Override
- public void onError(Throwable error) {// Error handling...
- }
- });
区别就在于 Retrofit 把请求封装进 Observable,在请求结束后调用
或在请求失败后调用
- onNext()
。但本质都差不多,而且在细节上 Observable 的形式似乎还比 Cakkback 形式要差一些,那 Retrofit 为什么还要提供 RxJava 的支持呢?
- onError()
因为它好用啊,从这个例子看不出来因为这只是最简单的情况。而情况一旦复杂起来,Callback 形式马上就会开始让人头疼。
比如,这样一种情况:你的程序渠道的 User 并不应该直接显示,而是需要先于数据库中的数据进行比对和修正后再显示。使用 Callback 方式大概可以这么写:
- getUser(userId,newCallback(){
- @Override
- public void success(User user) {
- processUser(user);// 尝试修正 User 数据userView.setUser(user);
- }@Override
- public void failure(RetrofitError error) {// Error handling...
- }
- });
很简便,但是不要这样做,为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10ms-20ms 是很常见的,这样的耗时操作容易造成界面卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化以下:
- getUser(userId,newCallback() {
- @Override
- public void success(User user) {newThread() {@Override
- public void run() {
- processUser(user);// 尝试修正 User 数据runOnUiThread(newRunnable() {// 切回 UI 线程
- @Override
- public void run() {
- userView.setUser(user);
- }
- });
- }).start();
- }@Override
- public void failure(RetrofitError error) {// Error handling...
- }
- });
性能问题解决,但是单吗就变得比较乱了,蜜汁缩进!
看 RxJava 的形式:
- getUser(userId)
- .doOnNext(newAction1(){
- @Override
- public void call(User user) {
- processUser(user);
- }
- })
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(newObserver(){
- @Override
- public void onNext(User user) {
- userView.setUser(user);
- }@Override
- public void onCompleted() {
- }@Override
- public void onError(Throwable error) {// Error handling...
- }
- });
RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener、设置 TextWatcher 这样的注册绑定对象 API。
举个设置点击监听的例子。使用 RxBinding,可以把事件监听用这样的方法来设置:
- Button button = ...;
- RxView.clickEvents(button)// 以 Observable 形式来反馈点击事件.subscribe(newAction1() {
- @Override
- public void call(ViewClickEvent event) {// Click handling}
- });
看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的
来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的
- setOnClickListener()
,用于去抖动,也就是消除手抖导致的快速连环点击:
- throttleFirst()
- RxView.clickEvents(button).throttleFirst(500,TimeUnit.MILLISECONDS).subscribe(clickAvction);
前面举的
和
- Retrofit
的例子,是两个可以提供现成的
- RxBinding
的库。而如果你有某些异步操作无法用这些库来自动生成
- Observable
,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩 / 解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。
- Observable
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用
或者 GreenRobot 的
- Otto
。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了
- EventBus
,目前为止没有不良反应。
- Otto
来源: http://blog.csdn.net/chandelierr/article/details/73505804