前言
以前写相关的 RxJava 的文章比较少, 也就下面二篇:
目需求讨论 - 手把手带你写 RxPermission
RxJava 小考题 -- Rxjava 源码分析 (一)
用了这么久的 RxJava, 所以想要做个总结, 所以打算也写一个 RxJava 源码分析 的系列 (所以不是简单的说明如何使用 RxJava 步骤, 看之前也最好会使用 RxJava):
Android 技能树 - Rxjava 源码系列 (1) 之 初步结构
Android 技能树 - Rxjava 源码系列 (2) 之 Observable 和 Observer(待写)
Android 技能树 - Rxjava 源码系列 (3) 之 线程切换 (待写)
Android 技能树 - Rxjava 源码系列 (4) 之 常用操作符 (待写)
正文
我们还是通过基础的例子来进行讲解, 比如你有个快递, 你很想知道快递是否已经到小区, 你可能想要在快递刚到你的小区的时候就马上知道, 然后马上就能享受拆快递所带来的乐趣, 毕竟等待快递的时候实在太煎熬了.
那我们有什么方式来确定快递是否到了小区呢?
1. 轮询 和 更新发送
1.1 轮询
我们可以每隔 1 分钟, 就打个电话给快递小哥, 问他我的快递是不是已经送到了, 这样当快递小哥刚送货到你的小区的时候, 你都最多能在不超过一分钟内知道快递已经到小区了.(缺点也很明显, 电话费很贵, 同时你不怕小哥打死你的话, 你也可以用这种方式)
1.2 更新发送
当快递小哥把快递送到你的小区的时候, 一般都是二种方式: 1. 他可能直接打你的电话通知你下来拿快递; 2. 统一放在快递货柜中, 然后你会收到货柜的短信通知, 根据开柜号码去拿快递.
明显这种数据更新后主动通知的方式, 我们更容易接受, 大家都开心, 小哥也不会打死你. 但是上面我们也说了快递小哥通知你的方式有二种: 直接通知你 / 货柜接收快递, 货柜再通知你
快递员到了小区后, 通知你的二种方式又分别对应了什么模式?
2. 观察者模式和发布订阅模式
我们上面已经提到了快递小哥到了你小区, 有二种方式通知你, 其实这里对应了 观察者模式和发布订阅模式这二种模式. 其实我以前也总是把这二个模式弄混, 感觉都差不多, 所以在和别人讨论的时候都是混着说着这个名词, 后来看了网上相关的文章, 发现这二者还是有点不同的, 正对应了上面你收到快递的信息一样的区别.
2.1 观察者模式
对应的就是我们的快递员把快递送到了小区后, 直接打电话给你, 说你快下来拿下快递; 这时候快递员是直接跟你本人直接沟通的
2.2 发布订阅模式
对应的就是我们的快递员把快递送到小区后, 把快递放在快递柜中, 然后快递员只要把你的手机号码输入到快递柜, 然后把快递放入到具体的某个快递柜的格子中, 就可以直接走人了, 等会会由快递柜这边通知你快递到了, 然后过来取快递.
二者最大的区别是什么?
我直接引用网上的其他文章的内容: 两种模式都存在订阅者和发布者 (具体观察者可认为是订阅者, 具体目标可认为是发布者), 但是观察者模式是由具体目标调度的, 而发布 / 订阅模式是统一由调度中心调的, 所以观察者模式的订阅者与发布者之间是存在依赖的, 而发布 / 订阅模式则不会.
那 RxJava 是属于上面的二种的哪一种呢?
我们直接看 GitHub 上的 RxJava 介绍:
看不懂的我们直接谷歌翻译走起:
我们可以看到介绍, 说是扩展了观察者模式, 所以说明我们的 Rxjava 是直接把观察者注册到了发布者. 而没有中间的调度中心. 所以也就是我们上面的快递员直接打电话通知你下来拿快递的方式.
3.Rxjava 的具体分析
我们使用 RxJava 最基础的方式是什么?
我们刚开始学如何使用的时候, 估计百分之 99 的都是先学会使用 create 操作符:
- Observable.create(new ObservableOnSubscribe<Object>() {
- @Override
- public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
- }
- }).subscribe(new Observer<Object>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(Object o) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
其实这个最基础的了解了后, 看其他的什么 just,fromArray 等就都理解了; 当然我们既然是 RxJava 系列的第一篇初步结构, 所以我也不会马上就讲解各种源码, 我们可以先模仿一个简单的, 比如:
- Observable.just(1)
- .subscribe(new Observer<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(Integer integer) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
我们可以这样先模仿这个简单的, 写个效果一样的伪代码, 了解一下基本的东西即可.
所以如果我们写了一个简单的版本的 RxJava 应该是这样的:
- Observer.java:
- public class Observer<T> {
- @Override
- public void onSubscribe(Disposable d) {
- }
- public void onNext(T obj) {
- }
- public void onError(Throwable e) {
- }
- public void onComplete() {
- }
- }
- Observable.java:
- public class Observable<T> {
- T value;
- Observer<? super T> observer;
- //just 传进来的数值, 等会要发送给 Observer
- public void just(T value) {
- this.value = value;
- }
- public final void subscribe(Observer<? super T> observer) {
- // 把 Observer 注册到 Observable 中, 其实就等于 Observable 拿了 Observer 的对象, 等会好调用它相应的方法
- this.observer = observer;
- // 生成 Disposable 对象, 调用 observer 的 onSubscribe 方法
- Disposable a = xxxxxxx;
- observer.onSubscribe(a);
- // 我们已经拿到了 Observer 对象, 可以做相关逻辑了
- start();
- }
- private void start() {
- try {
- // 我们把 just 传进的数值, 通过已经拿到的 Observer 对象, 调用 onNext 把数值给它
- observer.onNext(value);
- // 发送完了之后执行 onComplete() 方法说明结束了.
- observer.onComplete();
- } catch (Exception e) {
- // 如果中间出现异常, 就调用 Observer 的 onError 方法
- observer.onError(e);
- }
- }
- }
是不是感觉, 卧槽, 原来这么简单吗? 没错, 这样看来我们也的确是 Observable 直接持有了 Observer 的对象, 也的确符合上面我们说的 Rxjava 使用的是观察者模式, 而不是发布订阅模式.
所以现在初步来看 RxJava 目前来看三步走:
结语
因为只是第一篇初步结构, 所以内容也十分简单, 后续会具体分析相关的源码内容.
来源: https://juejin.im/post/5c4136f4518825260b46ef9c