这篇文章是根据 Jake Wharton 在 GOTO CopenHagen 2016 上的讲话整理的。
下一个版本(2.0)的 RxJava 还在开发中。虽然 observable、订阅管理和背压(backpressure)都完全重写了,但是 operator 基本没有任何的变化。在本文中你将学到如何让你的库和应用迁移到 RxJava 2 上,以及如何处理 RxJava 的两个版本。
为什么响应式编程突然间流行起来。除非你可以一开始就把 app 定义为同步的模式,否则的话只要有一个异步的处理就会把你习惯的传统的编程方式打破。"打破" 并不是说你的 app 运行不了,而是说你会面对异步带来的复杂度增加。
下面的一个例子可以很好的说明这个问题:
- interfaceUserManager {
- UsergetUser();void setName(String name);void setAge(intage);
- }
- UserManager um =new UserManager();
- System.out.println(um.getUser());
- um.setName("Jane Doe");
- System.out.println(um.getUser());
这个简单的例子里包含一个 user 对象,且可以调用 setter 来改变它。如果我们只用同步、单线程的方式来处理我们可以信赖我们要得到的结果:创建一个实例,打印 user,修改它的属性,再次打印 user。
当需要用异步的方式来处理的时候。比如,我们要显示的是 server 端对 user 的修改。上例中后面的两个方法就需要异步处理。我们该如何修改代码来适应这个改变呢?
你可以做的就是什么都不做。你可以假设异步调用 server 端的修改是成功的,并且你可以在本地修改 user。这样打印出来的就是 server 端的修改。很明显这样的修改不是好主意。网络是不稳定的,server 端也可能返回一个错误,这个时候你就需要在本地处理这些问题了。
我们可以很简单的处理这个问题。比如,在 server 端的异步请求成功后调用一个 runnable。现在就是响应式编程了。
- interfaceUserManager {
- UsergetUser();void setName(String name, Runnable callback);Avoid setAge(intage, Runnable callback);B
- }
- UserManager um =new UserManager();
- System.out.println(um.getUser());
- um.setName("Jane Doe",newRunnable() {@Override public void run() {
- System.out.println(um.getUser());
- }
- });
然而我们并没有处理可能发生的问题,比如网络请求失败的时候。也许我们创建自己的监听器,这样当一个错误发生的时候我们就可以处理这个错误。
- UserManager um =new UserManager();
- System.out.println(um.getUser());
- um.setName("Jane Doe",newUserManager.Listener() {@Override public void success() {
- System.out.println(um.getUser());
- }@Override public void failure(IOException e) {// TODO show the error...}
- });
我们可以把错误通知给 user。我们可以自动重试。这些办法都可以用,并且这是大多数人处理异步问题的方式。
问题出现的地方是,如果你要处理更多的时候。你需要支持多个调用:比如在 app 里填写一个 form 的时候,需要改变 user 的多个属性。或者有多个异步的调用,如一个异步调用成功之后需要调用另外的一个的时候。
- public final classUserActivityextendsActivity {private finalUserManager um =new UserManager();@Override protected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.user);
- TextView tv = (TextView)findViewById(R.id.user_name);
- tv.setText(um.getUser().toString());
- um.setName("Jane Doe",newUserManager.Listener() {@Override public void success() {
- tv.setText(um.getUser().toString());
- }@Override public void failure(IOException e) {// TODO show the error...}
- });
- }
- }
有一点需要注意,我们是在 Android 的范围内讨论问题。所以还有很多其他的问题需要考虑。比如,在我们
的回调里我们还要把数据传递到 UI 里。但是 Android 的 Activity 是有生命周期的,随时可能被回收。如果这个异步调用返回的时候 UI 已经销毁了,你就会遇到问题。
- success
有一些其他的方式来处理上面的问题。我们可以在修改视图之前检查一下视图的状态。我们也可以创建一个匿名类型,虽然这回短时的造成内存泄漏,因为它要持有 Activity 的引用。如果 Activity 已经消失,那么这个回调还是会在 background 发生。
最后一件事是我们还没有定义这些回调运行在哪个线程上。也许我们的回调运行在子线程上,那么我们就有责任在主线程上修改 UI。
- public final classUserActivityextendsActivity {private finalUserManager um =new UserManager();@Override protected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.user);
- TextView tv = (TextView)findViewById(R.id.user_name);
- tv.setText(um.getUser().toString());
- um.setName("Jane Doe",newUserManager.Listener() {@Override public void success() {runOnUiThread(newRunnable() {@Override public void run() {if(isDestroyed()) {
- tv.setText(um.getUser().toString());
- }
- }
- });
- }@Override public void failure(IOException e) {// TODO show the error...}
- });
- }
- }
我们在 activity 中添加了很多和这段代码的初衷不相关的其他代码,比如开始一个异步的任务,并处理这个回调的结果。我们还没有处理用户的输入,处理按钮点击和多字段输入等情况。开始的代码只是一个情况的说明,当你面对一个真正的 app 的时候,所有的这些问题是叠加在一起的。
从某种方式来说,app 里的任何事都是异步的。我们有网络请求,发送一个请求,过很久才会返回。我们不能因此而阻塞了 UI 线程。因此只能在后台的线程来处理它。我们有文件系统,还包括数据库的读写,甚至于
也会阻塞主线程。
- SharedPreferences
user 也是一个异步的数据。我们在 UI 里显示数据,并且 UI 会用按钮点击或者修改输入框的方式做出响应。这些都是异步方式发生的。我们不能同步的拉去 user 数据,只能等待数据返回。
很多人认为可以写一个单线程的 App,任何的 task 都运行在主线程上。但是,UI 本身就是异步的。处理时不时接受的异步数据,同时还要处理 app 的各种不同状态。做这些的时候还不能阻塞主线程。这就是 app 更加复杂的根源。
很难找到一个 app 没有网络请求的。你还要处理文件和数据库,这些也是异步的。最后 UI 也成为了一个异步的源。所以,Android 的每一个方面都是异步的,还是坚持以往的单线程同步的编程方式上最后伤害的就是你自己。
我们应该考虑一种模型,我们的代码处在中间层,作为各种状态的仲裁人。而不是理顺全部的异步处理。我们可以让 UI 直接订阅数据库,并在数据发生更改的时候做出相应。我们可以在按钮点击的时候来修改网络请求或者数据库,而不是收到一个按钮点击之后分发点击事件。
类似的,当网络请求返回的时候,要是能更新数据就更好了。我们知道当数据更新的时候,UI 也跟着自动更新,这样就可以删除被动更新界面的代码。如果 Android 做某些异步 task 的时候,比如旋转,UI 可以自动更新就好了,最好是可以自动可以开始一些 background 任务。
最后,我们就可以删除很多维护各种状态的代码。
这就是为什么我们要开发 RxJava。而且它也是 Android 上实际的响应式库。它也是 Android 上第一个可以用于响应式编程的库。RxJava 2 支持 Android 所支持的我全部版本的 java。
它主要做三个方面的事:
这些数据源只有在开始监听的时候才开始执行,如果你取消监听则会同时取消正在执行的任务。网络请求可以抽象为一个单个的任务,但是一个按钮点击流去可以是无限的,只要你的 UI 在那,即使你只订阅了按钮的点击事件。而且,按钮的点击流也可以是空的,也可以是永不停止的。这些都对传统的观察者模式造成了冲击。我们有生成数据的,有一个数据是如何定义的约定,我们想做的只是监听它。我们想要加一个监听器,在数据发生改变的时候得到通知。
这两个主要的类型都会出现在 RxJava 2 中。这两个类型都会用于给同样的类型(可以包含 0 到 n 个元素)建模。为什么同一个数据类型需要两种类型呢?
这是因为背压(backpressure)这个问题。"我"(作者)并不想阐述太多背压的内容,虽然它可以让数据变慢。我们所处的系统只含有有限的资源,但是如果你不能快速的处理背压的话,它会导致就给你发送数据的数据源变慢。
在 RxJava 1 中,系统的每一个类型都有背压,但是在 RxJava 2,我们有两种类型。由于每一个类型都有也要处理的背压,但是不是所有的数据源都实现了他,所以你最终会遇到 crash。这是因为背压必须处理。在 RxJava 2 中,你可以知道你所采用的类型系统是否支持背压。
比如,如果我们有一个 touch event 作为数据源,这个是我们无法变慢的,我们不能告诉用户说慢些点,知道我们把上次事件获得的变化在界面上绘制完成之后再点击第二次。我们只能用其他的方式来处理,比如 disable 按钮或者显示其他的界面来让它变慢,但是点击之后的事件发送并不能变慢。
你可以和数据库做对比。比如我们要获得一个大是结果集,我们也许只想在某个时候获取这个结果集的某些 row。一个数据库可以很好的体现一个问题:它拥有游标的概念。但是 touch event 流和数据库的概念不同。因为,并没有什么方法可以减慢用户的手指。在 RxJava 1 中,你可以看到这两种类型都做为
使用,所以在运行时你要处理背压的问题,最终会得到一个异常或者你的 app 崩溃。
- Observable
- Observable events
- = RxView.touches(paintView);
- Observable rows
- = db.createQuery("SELECT * …");
- MissingBackpressureException
因为两个不同的类型,所以他们暴露了背压。因为他们对同样的数据类型建模,所以在把数据传入回调的问题上使用的是同样的方式。两个数据源的监听接口看起来也很接近。第一个方法叫做
,这个方法也是数据发送的方法。只要
- onNext
后者
- Observable
生成的数据不止一个,那么生成一个数据就会调用一次这个方法。这样你就可以处理每个元素。
- Flowable
- Observable Flowable
- interfaceSubscriber {
- void onNext(T t);void onComplete();void onError(Throwable t);void onSubscribe(Subscription s);
- }interfaceDisposable {void dispose();
- }interfaceObserver {
- void onNext(T t);void onComplete();void onError(Throwable t);void onSubscribe(Disposable d);
- }interfaceSubscription {void cancel();void request(longr);
- }
这可以使无穷的。乳沟你监听了一个按钮的点击,那么
方法在每次用户的点击下都会被调用。对于有限的数据源来说,我们会有两个终结事件,一个是
- onNext
,一个是
- complete
, complete 表示成功,error 表示错误。
- error
和
- onComplete
被叫做终结事件是因为这两个方法被调用后
- onError
方法将不会再被调用。有一点不同的地方放就在于方法
- onNext
。
- onSubscribe
如果你知道 RxJava 1,那么下面将的就是你需要注意的了:当你订阅了一个 Observable 或者一个 Flowable,那么你就创建了一个资源。而资源都是需要在使用之后回收的。这个
回调会在开始监听一个 Observable 或 Flowable 的时候立刻被调用,并且会根据订阅的类型返回给你两个类型中的一个类型。
- onSubscribe
对于 Observable 来说,你可以调用
方法,也就是说我已经用完了这个资源,我不需要任何回调了。如果我们有一个网络请求呢?这会取消网络请求。如果你监听了一个按钮点击的无穷流,那么调用
- dispose
方法就是说你不想再接受点击事件。并且会
- dispose
视图的监听器。这些对于 Flowable 类型也是一样的。虽然它有一个不同的名字,使用上也是一样的。它有一个
- onSet
方法和
- cancel
方法的作用是一样的。不同之处它有另外一个叫做
- dispose
的方法,而且这也是背压在 API 中现身的地方。这个 request 方法告诉 Flowable 类型你需要更多的元素。
- request
... 是一个提供了一个标准的,没有阻塞背压的异步流处理措施。
我准备讲一下为什么 disposable 和 subscription 类型的命名如此不同,为什么他们的方法,一个叫做 dispose;一个叫做 cancel,而不是一个继承另一个再加上 reqeust 方法。原因是有这么一个叫做响应流(reactive stream)的概念。它是其他附属出现的源头。
我们会使用 subscriber 类型和 subscription 类型作为中间人。这些其实是响应流的一部分,并且这也是为什么他们有不同的名字。
- interfacePublisher {
- void subscribe(SubscribersuperT> s);
- }interfaceSubscriber {
- void onNext(T t);void onComplete();void onError(Throwable t);void onSubscribe(Subscription s);
- }interfaceSubscription {void request(longn);void cancel();
- }interfaceProcessor extendsSubscriber, Publisher {
- }
下面有一个例子:
- interfaceUserManager {
- UsergetUser();void setName(String name);void setAge(intage);
- }interfaceUserManager {
- Observable getUser();void setName(String name);void setAge(intage);
- }
现在来看一个例子,里面有 user manager 的两个定义。之前我们从类里获取用户并显示在界面上是再不能正确了。现在我们把这个模型考虑成用户的一个观察者,一个 user 对象的数据源。只要 user 发生改变就会发出通知,接着就可以响应变化并把变化的数据显示出来。这个时候也不用考虑系统里发生的各种事件里什么时候才是显示 user 改变的最佳时机。
RxJava 里的 observable 有三种子集。一个是 Single,这个类型包含一个 item,或者包含一个 error。这个不像是一个流,而更像是单个的异步源。而且它不包含背压。比如你调用一个方法,返回一个类型的实例或者抛出一个异常。Single 也是同样的概念。你订阅了一个 Single,要不返回一个数据,要不接受到一个 error。不同之处在于它是响应式的。
第二个是 Completable。他就像是一个返回为 void 的方法。他会成功完成,或者抛出一个异常。这就好比是一个响应式的 Runnable。Completable 包含了一组可以运行的代码,要么成功要么失败。
最后一个是 Maybe,这个类型在 RxJava 2 里才有。他要么有一个 item,errors 或者也可能一个 item 都没有。它可以被认为是一个 optional。一个返回 optional 值的方法,如果不抛出异常的话它总会返回什么。但是返回的 optional 值可能有值也可能没有值。
- interfaceUserManager {
- Observable getUser();void setName(String name);void setAge(intage);
- }interfaceUserManager {
- Observable getUser();
- CompletablesetName(String name);
- CompletablesetAge(intage);
- }
如果
方法和
- setName
方法的调用是异步的,他们要不成功要么失败,他们并不真的返回数据。所以 completable 类型最适合他们。
- setAge
这个例子用来显示如何创建源,如何把已经存在的响应式源整合到一起。
- Flowable.just("Hello");
- Flowable.just("Hello","World");
- Observable.just("Hello");
- Observable.just("Hello","World");
- Maybe.just("Hello");
- Single.just("Hello");
这些类型都有静态方法可以用来创建。你也可以从单值上创建,也可以从一个数组或者任何可遍历的类型上创建。但是有两个方法估计会是最常用的,不管是同步的还是异步的。
- OkHttpClient client = // …
- Request request = // …
- Observable.fromCallable(new Callable() {@Override public String call() throws Exception {
- Y
- return client.newCall(request).execute();
- Z
- }
- });
第一个是
. 这个方法用来处理只返回一个值的同步行为。这个方法的一大好处是允许你从 callable 上抛出一个异常。如果有个网络请求,并且它会抛出一个 I/O 异常。如果它抛出一个异常的话我们就可以捕获一个 error。如果这个请求成功的话我们就可以从
- fromCallable
方法上获得想要的结果。
- onNext
- Flowable.fromCallable(() ->"Hello");
- Observable.fromCallable(() ->"Hello");
- Maybe.fromCallable(() ->"Hello");
- Maybe.fromAction(() -> System.out.println("Hello"));
- Maybe.fromRunnable(() -> System.out.println("Hello"))
- Single.fromCallable(() ->"Hello");
- Completable.fromCallable(() ->"Ignored!");
- Completable.fromAction(() -> System.out.println("Hello"));
- Completable.fromRunnable(() -> System.out.println("Hello"));
在全部的五个类型上都可以调用
方法。
- fromCallable
在
类型和
- Maybe
类型上有两个另外的方法会经常用到。这两个类型都不返回值。只是一个 runnable,只是这个 runnable 是响应式的。
- Completable
- Observable.create(newObservableOnSubscribe() {
- @Override
- public void subscribe(ObservableEmitter e) throwsException {
- e.onNext("Hello");
- e.onComplete();
- }
- });
创建 observable 最好用的方法是
。我们会传入一个回调,它会在有一个新的 subscriber 的时候调用。
- create
- Observable.create(e -> {
- e.onNext("Hello");
- e.onNext("World");
- e.onComplete();
- });
不同于
,
- fromCallable
方法可以多次调用
- create
。另外一个好处是我们现在可以对异步数据建模。如果我发出一个 http 请求,可以调用
- onNext
方法来完成异步调用。另外的一个好处是你可以处理 unscubscribe 的情况。
- onNext
- OkHttpClient client =// …Request request =// …Observable.create(e -> {
- Call call = client.newCall(request);
- e.setCancelation(() -> call.cancel());
- call.enqueue(newCallback() {@Override public void onResponse(Response r)throwsIOException {
- e.onNext(r.body().string());
- e.onComplete();
- }A@Override public void onFailure(IOException e) {
- e.onError(e);
- }
- });
- });
如果停止监听 http 请求的话,那就没有什么理由继续执行了。我们就可以添加一个取消的操作来取消 http 请求并且回收资源。
- Flowable.create(e - >{…
- });
- Observable.create(e - >{…
- });
- Maybe.create(e - >{…
- });
- Single.create(e - >{…
- });
- Completable.create(e - >{…
- });
- Flowable
- interfaceSubscriber {
- void onNext(T t);void onComplete();void onError(Throwable t);void onSubscribe(Subscription s);
- }interfaceSubscription {void cancel();void request(longr);
- }
- Observable
- interfaceObserver {
- void onNext(T t);void onComplete();void onError(Throwable t);void onSubscribe(Disposable d);
- }interfaceDisposable {void dispose();
- }
当你订阅一个 Observable 的时候你不会直接使用这些接口,
方法调用后开始监听。那么如何 unsubscribe 呢?
- subscribe
- Observable o = Observable.just("Hello");
- o.subscribe(newDisposableObserver() {
- @Override public void onNext(String s) { … }@Override public void onComplete() { … }@Override public void onError(Throwable t) { … }
- });
我们有一个类型叫做
,这个类型自动处理了很多事,你只需要关心 Observable 本身。那么应该如何 dispose 呢?
- DisposableObserver
- Observable o = Observable.just("Hello");
- o.subscribe(newDisposableObserver() {
- @Override public void onNext(String s) { … }@Override public void onComplete() { … }@Override public void onError(Throwable t) { … }
- });
它实现了
,所以你可以调用 dispose 方法,并且它会将这个方法沿着调用链一直上传。在 RxJava 2 有一个新的方法叫做
- disposable
。它返回一个对象,你可以在它上面调用 dispose 方法。
- subscribeWith
- Observable o = Observable.just("Hello");
- o.subscribeWith(new DisposableObserver() {…
- });
- Maybe m = Maybe.just("Hello");
- m.subscribeWith(new DisposableMaybeObserver() {…
- });
- Single s = Single.just("Hello");
- s.subscribeWith(new DisposableSingleObserver() {…
- });
- Completable c = Completable.completed();
- c.subscribeWith(new DisposableCompletableObserver() {…
- });
上面的四个类型都是非背压的。那么对于背压类型如何处理呢:
- Flowable f = Flowable.just("Hello");
- Disposable d1 = f.subscribeWith(new DisposableSubscriber() {…
- });
- Observable o = Observable.just("Hello");
- Disposable d2 = o.subscribeWith(new DisposableObserver() {…
- });
- Maybe m = Maybe.just("Hello");
- Disposable d3 = m.subscribeWith(new DisposableMaybeObserver() {…
- });
- Single s = Single.just("Hello");
- Disposable d4 = s.subscribeWith(new DisposableSingleObserver() {…
- });
- Completable c = Completable.completed();
- Disposable d5 = c.subscribeWith(new DisposableCompletableObserver() {…
- });
背压类型有些不同。你可以这么类比,比如你不会打开一个没有关闭方法的文件,获取一个没有关闭方法的游标。
操作符做以下三件事:
比如:
- Observable greeting = Observable.just("Hello");
- Observable yelling = greeting.map(s -> s.toUppercase());
发出一个 string,得到一个新的 string。
在响应式的世界里,我们有一个 Observable 并且通过 operator 来实现一个操作。在这个例子里,
是一个操作符,我们可以获取到发射的数据并在上面做某些处理来创建一个新的数据。
- map
- Observable user = um.getUser();
- Observable mainThreadUser =
- user.observeOn(AndroidSchedulers.mainThread());
我想要在另外的一个线程上监听数据的发射。这样 user 数据会在后台线程里,但是要在主线程上处理 user 数据。那么
就是我们需要的操作符了。因为我们改变了线程,那么你应用这些操作符的顺序就很关键了。
- observeOn
- OkHttpClient client =// …Request request =// …Observable response = Observable.fromCallable(() -> {returnclient.newCall(request).execute();
- });
- Observable backgroundResponse =
- response.subscribeOn(Schedulers.io());
如果我们发出了一个网络请求,而且这个网络请求会同步的完成。那么,我们肯定不希望它发生在主线程上。我们可以使用操作符来修改我们在哪里订阅 Observable,也就是请求最终在哪里执行。当我们订阅了后台的返回,他就会改在后台线程里执行。I/O 只是一个你可以使用的线程池,它会在线程池里执行并最终通知监听者发生的改变。
- OkHttpClient client =// …Request request =// …Observable response = Observable.fromCallable(() -> {returnclient.newCall(request).execute();
- })
- .subscribeOn(Schedulers.io())
- .map(response -> response.body().string())// Ok!.observeOn(AndroidSchedulers.mainThread());
由于我们在
后面使用了
- observeOn
操作符,它就会在 Android 的主线程上执行。我们不希望在主线程上处理 http 返回,我们希望处理玩 http response 之后再返回到主线程上。
- map
还有其他的操作符可以处理 Observable 并返回一个其他的类型。一个操作符是
,它会返回一个 list 的第一个元素,一个 Single 类型的对象。在 RxJava 1 中这个方法会返回一个只发射一个元素的 Observable。如果这个 Observable 为空的话,会返回一个错误。因为我们知道一个 single 要不有一个元素要么就是 error。
- first()
其他的操作符比如
会返回一个 Maybe 类型的对象。当 observable 为空的时候,maybe 类型的对象可以完成而不会抛出异常。如果你只关心成功或者失败的话,那么 completable 类型的对象绝对是你想要的。这些 flowable 也都存在。
- firstElement()
如果我们想要让我们最开始的例子变成响应式的,我们可以订阅 user 并且说:"我们想要在主线程上得到通知,然后我想要在 UI 里显示出来"。任何时候 user 发生了改变,这个代买就会自动执行,我们可以自动在界面上看到更新。我们不用在担心管理我们自己了。
- // onCreatedisposables.add(um.getUser()
- .observeOn(AndroidSchedulers.mainThread())
- .subscribeWith(newDisposableObserver() {
- @Override public void onNext(User user) {
- tv.setText(user.toString());
- }@Override public void onComplete() {/* ignored */}@Override public void onError(Throwable t) {/* crash or show */}
- }));// onDestroydisposables.dispose();
然而,我们一定要记住管理返回的 disposable,因为我们在 Android 的世界里,当 Activity 消失的时候我们想要这些代码也停止运行。在
方法里,我们 dispose 这些 disposable。
- onDestroy
- disposables.add(um.setName("Jane Doe")
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribeWith(new DisposableCompletableObserver() {@Override public void onComplete() {// success! re-enable editing}@Override public void onError(Throwable t) {// retry or show}
- }));
RxJava 2 要处理的是 Android 里的异步问题。无论是网络请求还是 Android 本身,一个数据库或者甚至于是一个事件。并且编写响应这些源的改变的代码,而不是编写应对改变,管理状态的代码。
- class RxJavaInterop {
- static Flowable toV2Flowable(rx.Observable o) {…
- }
- static Observable toV2Observable(rx.Observable o) {…
- }
- static Maybe toV2Maybe(rx.Single s) {…
- }
- static Maybe toV2Maybe(rx.Completable c) {…
- }
- static Single toV2Single(rx.Single s) {…
- }
- static Completable toV2Completable(rx.Completable c) {…
- }
- static rx.Observable toV1Observable(Publisher p) {…
- }
- static rx.Observable toV1Observable(Observable o, …) {…
- }
- static rx.Single toV1Single(Single o) {…
- }
- static rx.Single toV1Single(Maybe m) {…
- }
- static rx.Completable toV1Completable(Completable c) {…
- }
- static rx.Completable toV1Completable(Maybe m) {…
- }
- }
如果你使用的是 RxJava 1,有一个 interop 对象可以使用。你可以在两个版本的类型之间转换。
RxJava 2 并不是什么新的知识。响应式编程在哪个维度上来说并不算新。Android 本身就是一个高度响应式的世界,知识我们被教育成使用一些顺序执行的,更加容易描述的方式来编写代码。
响应式编程允许我们来用一个更加合理的、异步的方式来对 Android 开发建模。拥抱源的异步,而不是我们自己去维护各种状态。让 Android app 的开发真正的响应式起来吧。
来源: http://www.cnblogs.com/sunshine-anycall/p/6885915.html