本系列文章索引响应式 Spring 的道法术器 http://blog.51cto.com/liukang/2090163
前情提要: Reactor 3 快速上手 http://blog.51cto.com/liukang/2090191 | 响应式流规范 http://blog.51cto.com/liukang/2090922
本文测试源码 https://github.com/get-set/get-reactive/blob/master/snacks/src/test/java/com/getset/Test_2_6.java
2.6 测试
在非常重视 DevOps 的今天, 以及一些奉行 TDD 的团队中, 自动化测试是保证代码质量的重要手段. 要进行 Reactor 的测试, 首先要确保添加 reactor-test 依赖.
reactor-test 用 Maven 配置 <dependencies>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- <version>3.1.4.RELEASE</version>
- <scope>test</scope>
- </dependency>
reactor-test 用 Gradle 配置
- dependencies {
- testCompile 'io.projectreactor:reactor-test:3.1.4.RELEASE'
- }
2.6.1 使用 StepVerifier 来测试
1.3.2.3 节 http://blog.51cto.com/liukang/2090191 初步介绍了关于 StepVerifier 的用法. 举个例子回忆一下:
- @Test
- public void testAppendBoomError() {
- Flux<String> source = Flux.just("foo", "bar");
- StepVerifier.create(
- appendBoomError(source))
- .expectNext("foo")
- .expectNext("bar")
- .expectErrorMessage("boom")
- .verify();
- }
我们通常使用 create 方法创建基于 Flux 或 Mono 的 StepVerifier, 然后就可以进行以下测试:
测试期望发出的下一个信号. 如果收到其他信号 (或者信号与期望不匹配), 整个测试就会 失败(AssertionError), 如 expectNext(T...) 或
- expectNextCount(long)
- .`
处理 (consume) 下一个信号. 当你想要跳过部分序列或者当你想对信号内容进行自定义的校验的时候会用到它, 可以使用
- consumeNextWith(Consumer<T>)
- .
其他操作, 比如暂停或运行一段代码. 比如, 你想对测试状态或内容进行调整或处理, 你可能会用到 thenAwait(Duration)和 then(Runnable).
对于终止事件, 相应的期望方法 (如 expectComplete(),expectError(), 及其所有的变体方法) 使用之后就不能再继续增加别的期望方法了. 最后你只能对 StepVerifier 进行一些额外的配置并 触发校验(通常调用 verify() 及其变体方法).
从 StepVerifier 内部实现来看, 它订阅了待测试的 Flux 或 Mono, 然后将序列中的每个信号与测试 场景的期望进行比对. 如果匹配的话, 测试成功. 如果有不匹配的情况, 则抛出 AssertionError 异常.
响应式流是一种基于时间的数据流. 许多时候, 待测试的数据流存在延迟, 从而持续一段时间. 如果这种场景比较多的话, 那么会导致自动化测试运行时间较长. 因此 StepVerifier 提供了可以操作 "虚拟时间" 的测试方式, 这时候需要使用
StepVerifier.withVirtualTime
来构造.
为了提高 StepVerifier 正常起作用的概率, 它一般不接收一个简单的 Flux 作为输入, 而是接收 一个 Supplier, 从而可以在配置好订阅者之后 "懒创建" 待测试的 flux, 如:
- StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
- //... 继续追加期望方法
有两种处理时间的期望方法, 无论是否配置虚拟时间都是可用的:
thenAwait(Duration)会暂停校验步骤(允许信号延迟发出).
expectNoEvent(Duration)
同样让序列持续一定的时间, 期间如果有任何信号发出则测试失败.
在普通的测试中, 两个方法都会基于给定的持续时间暂停线程的执行. 而如果是在虚拟时间模式下就相应地使用虚拟时间.
- StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
- .expectSubscription() // 1
- .expectNoEvent(Duration.ofDays(1)) // 2
- .expectNext(0L)
- .verifyComplete(); // 3
expectNoEvent 将订阅 (subscription) 也认作一个事件. 假设你用它作为第一步, 如果检测 到有订阅信号, 也会失败. 这时候可以使用
expectSubscription().expectNoEvent(duration)
来代替;
期待 "一天" 内没有信号发生;
verify 或变体方法最终会返回一个 Duration, 这是实际的测试时长.
可见, withVirtualTime 使我们不用实际等 1 天来完成测试了.
虚拟时间的功能是如何实现的呢?
StepVerifier.withVirtualTime
会在 Reactor 的调度器工厂方法中插入一个自定义的调度器
VirtualTimeScheduler
来代替默认调度器(那些基于时间的操作符通常默认使用
Schedulers.parallel()
调度器).
2.6.2 用 PublisherProbe 检查执行路径
通常情况下, 使用 StepVerifier 的 expect * 就可以搞定多数的测试场景了. 但是, 它也有无计可施的时候, 比如下边这个特殊的例子:
- private Mono<String> executeCommand(String command) {
- // 基于 command 执行一些操作, 执行完成后返回 Mono<String>
- }
- public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
- return commandSource
- .flatMap(command -> executeCommand(command).then()) // 1
- .switchIfEmpty(doWhenEmpty); // 2
- }
then()会忽略所有的元素, 只保留完成信号, 所以返回值为 Mono<Void>;
也是一个 Mono<Void>.
1 和 2 都是 Mono<Void>, 这时候就比较难判断 processOfFallback 中具体执行了哪条路径. 这时候可以用 log()或 doOn*()等方法来观察, 但这 "在非绿即红" 的单测中不起作用. 或者在某个路径加入标识状态的值, 并通过判断状态值是否正确来确定, 但这就需要修改被测试的 processOfFallback 的代码了.
Reactor 版本 3.1.0 之后我们可以使用 PublisherProbe 来做类似场景的验证. 如下:
- @Test
- public void testWithPublisherProbe() {
- PublisherProbe<Void> probe = PublisherProbe.empty(); // 1
- StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) // 2
- .verifyComplete();
- probe.assertWasSubscribed(); // 3
- probe.assertWasRequested(); // 4
- probe.assertWasNotCancelled(); // 5
- }
创建一个探针(probe), 它会转化为一个空序列.
在需要使用 Mono<Void> 的位置调用 probe.mono() 来替换为探针.
序列结束之后, 你可以用这个探针来判断序列是如何使用的, 你可以检查是它从哪 (条路径) 被订阅的...?
对于请求也是一样的...?
以及是否被取消了.
2.6.3 使用 TestPublisher 手动发出元素
TestPublisher 本质上是一个 Publisher, 不过使用它能更加 "自由奔放" 地发出各种元素, 以便进行各种场景的测试.
1)"自由" 地发出元素
我们可以用它提供的方法发出各种信号:
next(T) 以及 next(T, T...) 发出 1-n 个 onNext 信号.
emit(T...) 起同样作用, 并且会执行 complete().
complete() 会发出终止信号 onComplete.
error(Throwable) 会发出终止信号 onError.
比如:
- @Test
- public void testWithTestPublisher() {
- TestPublisher<Integer> testPublisher = TestPublisher.<Integer>create().emit(1, 2, 3);
- StepVerifier.create(testPublisher.flux().map(i -> i * i))
- .expectNext(1, 4, 9)
- .expectComplete();
- }
2)"奔放" 地发出元素
使用 create 工厂方法就可以得到一个正常的 TestPublisher. 而使用 createNonCompliant 工厂方法可以创建一个 "不正常" 的 TestPublisher. 后者需要传入由
TestPublisher.Violation
枚举指定的一组选项, 这些选项可用于告诉 publisher 忽略哪些问题. 枚举值有:
REQUEST_OVERFLOW: 允许 next 在请求不足的时候也可以调用, 而不会触发 IllegalStateException.
ALLOW_NULL: 允许 next 能够发出一个 null 值而不会触发 NullPointerException.
CLEANUP_ON_TERMINATE
: 可以重复多次发出终止信号, 包括 complete(),error() 和 emit().
不过这个功能可能更多地是给 Reactor 项目开发者本身使用的, 比如当他们开发了一个新的操作符, 可以用这种方式来测试这个操作符是否满足响应式流的规范.
3)TestPublisher 也是个 PublisherProbe
更赞的是, TestPublisher 实现了 PublisherProbe 接口, 意味着我们还可以使用它提供的 assert * 方法来跟踪其内部的订阅和执行状态.
来源: http://www.bubuko.com/infodetail-2550558.html