- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter emitter) {
- int i = getNumber();
- if (i <0) {
- emitter.onComplete();
- return;
- } else {
- Log.d(TAG, Thread.currentThread().getName());
- emitter.onNext(i);
- emitter.onComplete();
- }
- }
- })
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.d(TAG, Thread.currentThread().getName());
- Log.d(TAG, integer + "");
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- }
- });
RxJava 有四个基本概念: Observable (可观察者, 即被观察者), Observer (观察者), subscribe (订阅), 事件. Observable 和 Observer 通过 subscribe() 方法实现订阅关系, 从而 Observable 可以在需要的时候发出事件来通知 Observer.
onNext(): 方法用来发送事件.
下面看看其他两个方法:
onCompleted(): 事件队列完结. RxJava 不仅把每个事件单独处理, 还会把它们看做一个队列. RxJava 规定, 当不会再有新的 onNext() 发出时, 需要触发 onCompleted() 方法作为标志.
onError(): 事件队列异常. 在事件处理过程中出异常时, onError() 会被触发, 同时队列自动终止, 不允许再有事件发出.
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个, 并且是事件序列中的最后一个. 需要注意的是, onCompleted() 和 onError() 二者也是互斥的, 即在队列中调用了其中一个, 就不应该再调用另一个.
讲一下我们上面的例子, 上面这个例子是采用简洁的链式调用来写的:
首先使用 create() 方法来创建一个 Observable , 并为它定义事件触发规则, 然后通过 emitter.onNext(i) 传递出来,.subscribeOn(Schedulers.io()) 便是指定该事件产生的所在的线程为子线程,.observeOn(AndroidSchedulers.mainThread()) 指定观察者执行的线程为主线程. 这时候为止返回的对象为 Observable 对象.
然后该 Observable 对象 subscribe 绑定观察者 (也就是观察者进行订阅), 里面有接收被观察者发出来的事件, 有一个成功的方法, 和一个失败的方法, 这样就实现了由被观察者向观察传递事件.
二, 对集合里的数据进行变换
- List<Integer> list = new ArrayList<Integer>() {
- {
- add(0);
- add(1);
- add(2);
- }
- };
- Observable.fromIterable(list).map(new Function() {
- @Override
- public Object apply(Object o) throws Exception {
- int i = (int) o + 1;
- return String.valueOf(i);
- }
- })
- .toList()
- .toObservable().subscribeOn(Schedulers.io())
- .subscribeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer() {
- @Override
- public void accept(Object o) throws Exception {
- Log.d(TAG, o.toString());
- }
- });
且看, 我们需要对某个集合里面的数据一一进行变换, 然后发送出来执行其他操作.
上面便是对集合里面的每一项进行加一操作, 然后再转换为 String 类型, 然后 toList(), 组合成集合发送出来, 最后在观察者方法中打印出每一项.
三, 合并执行
定义两个被观察者, 各自产生事件, 然后合并在一起, 发送给一个观察者.
首先定义我们上面第一个例子的被观察者, 用于发送一个数字:
- Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter emitter) {
- int i = getNumber();
- if (i <0) {
- emitter.onComplete();
- return;
- } else {
- Log.d(TAG, Thread.currentThread().getName());
- emitter.onNext(i);
- emitter.onComplete();
- }
- }
- })
- .subscribeOn(Schedulers.io());
其次再定义我们上面第二个例子的被观察者:
- List<Integer> list = new ArrayList<Integer>() {
- {
- add(0);
- add(1);
- add(2);
- }
- };
- Observable observable2 = Observable.fromIterable(list).map(new Function() {
- @Override
- public Object apply(Object o) {
- int i = (int) o + 1;
- return String.valueOf(i);
- }
- })
- .toList()
- .toObservable().subscribeOn(Schedulers.io());
最后将这两个被观察者的事件合并起来发送给一个观察者:
- Disposable disposable = Observable.zip(observable1, observable2, new BiFunction() {
- @Override
- public Object apply(Object o, Object o2) throws Exception {
- int i = (int) o;
- String k = (String) ((List) o2).get(0);
- return k + i;
- }
- })
- .subscribe(new Consumer() {
- @Override
- public void accept(Object o) {
- Log.d(TAG, (String) o);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) {
- Log.d(TAG, throwable.getMessage());
- }
- });
zip 方法, 顾名思义, 有点类似与于打包的意思.
o 为被观察者 1 返回的结果, o2 为被观察 2 返回的结果, 将这两个结果一起处理后发送给观察者. 打印出来.
现在先介绍这几个, 找个时间再整理一些其他的用法以及原理实现.
来源: http://www.bubuko.com/infodetail-3346743.html