自从去年 8 月底《浅谈 RxJava 与 2.0 的新特性》,已经过去快一年。笔者也没想到此文竟有读者等笔者填坑快一年了,不禁汗颜。所以笔者打算写关于一个 RxJava2 的系列文章,既为填坑,也为回报读者对我的支持。本文为第一篇。
读本系列文章,你可能有如下收获:
废话不多说,进入正题。
之前在《浅谈 RxJava 与 2.0 的新特性》我们提到过, RxJava2 遵循 Reactive Streams 的编程规范, 或者更精确的说,是 RxJava2 中的 Flowable 相关的类。因此我们只分析 Flowable 相关的实现与使用,剩下的
这些不会再提及,相信读者朋友们可以举一反三。
- Observable、 Completable、Single、 Maybe
Reactive Streams 中明确规范了如下 4 点:
后面笔者会用 RS 代替全称。 请跟随本系列文章慢慢看 Flowable 是出色的完成上述的要求。
Rx2 在源码中加入了一些注解,这些注解对运行没有任何实际作用,仅仅是用作标识备注,有助于开发者了解某个操作符的正确使用姿势,同时也有利于阅读源码时整理思路。这些注解位于
包名下,这里着重介绍一个。
- io.reactivex.annotations
BackpressureSupport 是用作标识这个操作符对背压的支持类型,有以下几种:
操作符,这个操作符支持的背压类型取决于
- defer
产生的
- Callable
。
- Publisher
- MissingBackpressureException
上面这些字面解释看起来还是很绕的,尤其是对于没有阅读过相关源码的读者。我们也不必一次性全部弄明白,后续会慢慢讲清楚所有。
前文中有提到过,Rx2 收回了
方法的权限,使开发者自定义的
- create
也能够正确的支持背压。而实现的方式就是通过额外提供一个
- create
参数。也因此,
- BackpressureStrategy
的方法注解中
- create
是
- BackpressureSupport
。
- SPECIAL
抛开 Rx2 提供的 plugin 不谈,本质上就是用 create 传进来的 2 个参数创建了
这个类。 根据传入的
- FlowableCreate
生成不同的
- BackpressureStrategy
对象。并遵循一致的编程约定,先调用 onSubscribe,随后将
- Emitter
传递给
- Emitter
用来发射数据。
- FlowableOnSubscribe
- @Override
- public void subscribeActual(Subscriber<? super T> t) {
- BaseEmitter<T> emitter;
- switch (backpressure) {
- case MISSING: {
- emitter = new MissingEmitter<T>(t);
- break;
- }
- case ERROR: {
- emitter = new ErrorAsyncEmitter<T>(t);
- break;
- }
- case DROP: {
- emitter = new DropAsyncEmitter<T>(t);
- break;
- }
- case LATEST: {
- emitter = new LatestAsyncEmitter<T>(t);
- break;
- }
- default: {
- emitter = new BufferAsyncEmitter<T>(t, bufferSize());
- break;
- }
- }
- t.onSubscribe(emitter);
- try {
- source.subscribe(emitter);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- emitter.onError(ex);
- }
- }
Rx2 大量的类通过继承
来表示计算个数与发射个数,请求一个 + 1,发射一个 - 1。并且在 Rx2 中 Long.MAX_VALUE 有特殊含义,表示无限的数据。即,如果
- AtomicLong
,即使发射了数据也不会减少自身的数值。
- request(Long.MAX_VALUE)
这里所有的 Emitter 都继承了基类
,并提供一些公共方法如
- BaseEmitter
等。然后根据各自的背压策略,实现相应的逻辑,下面分别介绍。
- setDisposable/setCancellable/requested/serialize
MISSING 即没有背压,我们看 onNext 函数会发现,每调用一次就会传递给下游的
,空指针则
- subscriber.onNext
。
- onError
- @Override public void onNext(T t) {
- if (isCancelled()) {
- return;
- }
- if (t != null) {
- actual.onNext(t);
- } else {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- for (;;) {
- long r = get();
- if (r == 0L || compareAndSet(r, r - 1)) {
- return;
- }
- }
- }
上面的代码是 2.1.2 版本的源码,笔者认为这里有一处 BUG 。即在自减的时候,没有检查 Long.MAX VALUE 的情况,导致在
后,发射数据时依然会不断自减,这是与一致的设计思路相悖的。反观下面 DROP 与 BUFFER 相关的 Emitter 处理时,则直接调用了
,在里面会有 Long.MAX VALUE 的判断。
虽然有点小 BUG,但是实际中除了在
函数中会出错外,不会影响正常的执行流。且一般开发者也不会使用
- requested()
函数。
- Emitter.requested()
虽然 MISSING 不支持背压,但是没关系,我们可以通过操作符来弥补。 这些操作符结合使用 MISSING 的 create 方法,使得原本不支持背压的 Flowable 支持背压。
当然我们也大可不必这样麻烦,既然要使用 buffer、drop 或者 latest,使用下面的策略即可。除非我们需要那些操作符提供的额外功能。
ERROR 则和最开始的
表现一致。
- BackpressureSupport.ERROR
下面这块代码是
,会被 ERROR 和 DROP 对应的
- NoOverflowBaseAsyncEmitter
继承。逻辑也很简单,即请求数如果还大于 0,则向下发射并将请求数减 1,否则走
- Emitter
方法。
- onOverflow()
- @Override public final void onNext(T t) {
- if (isCancelled()) {
- return;
- }
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- if (get() != 0) {
- actual.onNext(t);
- BackpressureHelper.produced(this, 1);
- } else {
- onOverflow();
- }
- }
而 ERROR 对应的实现则很简单了,不在赘述。
- @Override
- void onOverflow() {
- onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
- }
DROP 即直接丢弃超额的数据,体现在代码中就非常简单。
- @Override
- void onOverflow() {
- // nothing to do
- }
这俩之所以放到了一起,是因为 BUFFER 与 LATEST 本质上都是缓存了数据,细节上的区别就是,BUFFER 是缓存了所有数据,而 LATEST 只保留了最近的一个 onNext 数据。
体现在代码中这两者最主要的区别就是一个用了队列来缓存,一个用了 AtomicReference 来维持最后一个未被消费的数据。
就挑 BUFFER 来说, onNext 就是将数据扔进队列,而后尝试消费数据即调用
。onError 与 onComplete 则是将 结束标识置为 true ,并保留异常,然后依然也是在
- drain()
中消费该消息。 在 / onNext/onError/onComplete/onRequested 时,都会调用
- drain()
来消费队列中的数据。
- drain()
- @Override public void onNext(T t) {
- if (done || isCancelled()) {
- return;
- }
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- queue.offer(t);
- drain();
- }
中做的事就比较复杂了,为了保证线程安全,首先通过一个
- drain
来确保只有一个线程可以进入
- AtomicInteger
循环。 在 for 循环中,不断的消费队列中的数据,如果队列为空则检查结束标识是否为 true,是的话则发射 onComplete 或者 onError 。
- for(;;)
- void drain() {
- if (wip.getAndIncrement() != 0) {
- return;
- }
- int missed = 1;
- final Subscriber < ?super T > a = actual;
- final SpscLinkedArrayQueue < T > q = queue;
- for (;;) {
- long r = get();
- long e = 0L;
- while (e != r) {
- if (isCancelled()) {
- q.clear();
- return;
- }
- boolean d = done;
- T o = q.poll();
- boolean empty = o == null;
- if (d && empty) {
- Throwable ex = error;
- if (ex != null) {
- error(ex);
- } else {
- complete();
- }
- return;
- }
- if (empty) {
- break;
- }
- a.onNext(o);
- e++;
- }
- if (e == r) {
- if (isCancelled()) {
- q.clear();
- return;
- }
- boolean d = done;
- boolean empty = q.isEmpty();
- if (d && empty) {
- Throwable ex = error;
- if (ex != null) {
- error(ex);
- } else {
- complete();
- }
- return;
- }
- }
- if (e != 0) {
- BackpressureHelper.produced(this, e);
- }
- missed = wip.addAndGet( - missed);
- if (missed == 0) {
- break;
- }
- }
- }
这里请大家留意一个编程的套路,在绝大多数队列消费的场景里, Rx2 中都是使用了下面的方式。这也是我们可以积累使用的。通过这种方式可以保证 for 循环里的代码是单线程执行的,且如果执行期间有一次或多次新的调用
,会导致重新走一遍包含注释处的代码,确保数据可以正确的消费发射。
- drain()
- void drain() {
- if (wip.getAndIncrement() != 0) {
- return;
- }
- int missed = 1;
- for (;;) {
- // 消费队列, 发射数据
- missed = wip.addAndGet( - missed);
- if (missed == 0) {
- break;
- }
- }
- }
笔者在介绍过程中已经省略了很多细枝末节,不免显得知识有些分散,结合源码阅读效果更佳。没想到一个小小的 create 也包含这么多的玄机。
我相信通过阅读这篇文章,读者们写 create 的时候应该可以做到结合实际场景选择正确的
。
- BackpressureStrategy
有了 create 便从此开启 Rx2 万里长征第一步。下一篇,我们将会介绍 Rx2 的线程调度相关的操作符及其实现,敬请期待。
最后欢迎关注笔者的微信公众号,每一篇新的博文都将会在第一时间发布在公众号上。
来源: http://www.tuicool.com/articles/AZbyAnz