本章节继续介绍: Flux 和 Mono 操作符 (二)
1. 条件操作符
Reactor 中常用的条件操作符有 defaultIfRmpty,skipUntil,skipWhile,takeUntil 和 takeWhile 等.
1,defaultIfRmpty
defaultIfRmpty 操作符返回来自原始数据流的元素, 如果原始数据流中没有元素, 则返回一个默认元素.
defaultIfRmpty 操作符在实际开发过程中应用广泛, 通常用在对方法返回值的处理上. 如下 controller 层对 service 层返回值的处理.
- @GetMapper("/article/{id}")
- public Mono<ResponseEntity<Article>> findById(@PathVariable String id){
- return articleService.findOne(id)
- .map(ResponseEntity::ok)
- .defaultIfRmpty(ResponseEntity.status(404).body(null));
- }
- 2,takeUntil
takeUntil 操作符的基本用法是 takeUntil(Predicate<? super T>> predicate), 其中 Predicate 代表一种断言条件, takeUntil 将提取元素直到断言条件返回 true.
示例代码如下:
- Flux.range(1,100).takeUntil(i -> i == 10).subscribe(System.out::println);
- 3,takeWhile
takeWhile 操作符的基本用法是 takeWhile(Predicate<? super T>> continuePredicate), 其中 continuePredicate 也代表一种断言条件. 与 takeUntil 不同的是, takeWhile 会在 continuePredicate 条件返回 true 时才进行元素的提取.
示例代码如下:
- Flux.range(1,100).takeWhile(i -> i <= 10).subscribe(System.out::println);
- 4,skipUntil
与 takeUntil 相对应, skipUntil 的基本用法是 skipUntil(Predicate<? super T>> predicate).skipUntil 将丢弃原始数据中的元素, 直到 Predicate 返回 true.
5,skipWhile
与 takeWhile 相对应, skipWhile 操作符的基本用法是 skipWhile(Predicate<? super T>> continuePredicate). 当 continuePredicate 返回 true 时才进行元素的丢弃.
2. 数学操作符
Reactor 中常用的数学操作符有 concat,count,reduce 等.
1,concat
concat 用来合并来自不同 Flux 的数据, 这种合并采用的是顺序的方式.
2,count
count 操作符比较简单, 用来统计 Flux 中元素的个数.
3,reduce
reduce 操作符对流中包含的所有元素进行累积操作, 得到一个包含计算结果的 Mono 序列. 具体的累计操作也是通过一个 BiFunction 来实现的.
示例代码如下:
Flux.range(1,10).reduce((x,y) -> x+y).subscribe(System.out::println);
这里 BiFunction 就是一个求和函数, 用来对 1 到 10 的数字进行求和, 运行结果为 55.
与其类似的还有一个 reduceWith.
示例代码如下:
Flux.range(1,10).reduceWith(() ->5,(x,y) -> x+y).subscribe(System.out::println);
这里使用 5 来初始化求和过程, 得到的结果是 60.
3.Observable 工具操作符
Reactor 中常用的 Observable 操作符有 delay,subscribe,timeout 等.
1,delay
delay 将时间的传递向后延迟一段时间.
2,subscribe
在前面的代码演示了 subscribe 操作符的用法, 我们可以通过 subscribe() 方法来添加相应的订阅逻辑.
在前面章节中我们提到了 Reactor 中的消息类型有三种, 即正常消息, 异常消息和完成消息. subscribe 操作符可以只处理其中包含的正常消息, 也可以同时处理异常消息和完成消息. 当我们用 subscribe 处理异常消息时可以采用以下方式.
- Mono.just(100)
- .conacatWith(Mono.error(new IllegalStateException()))
- .subscribe(System.out::println,System.err::println);
以上代码执行结果如下, 我们得到了一个 100, 同时也获取了 IllegalStateExxeption 这个异常.
- 100
- java.lang.IllegalStateExxeption
有时候我们不想直接抛出异常, 而是想采用一个容错策略来返回一个默认值, 就可以采用以下方式.
- Mono.just(100)
- .conacatWith(Mono.error(new IllegalStateException()))
- .onErrorReturn(0)
- .subscribe(System.out::println);
以上代码执行结果如下. 当产生异常时, 使用 onErrorReturn() 方法返回一个默认值 0.
100
0
另外容错策略也是通过 switchOnError() 方法使用另外的流产生元素. 以下代码示例演示了这种策略.
与上面的执行结果相同.
- Mono.just(100)
- .conacatWith(Mono.error(new IllegalStateException()))
- .switchOnError(Mono.just(0))
- .subscribe(System.out::println);
- 3,timeout
timeout 操作符维持原始被观察者的状态, 在特定时间内没有产生任何事件时, 将生成一个异常.
4,block
block 操作符在没有接收到下一个元素之前一直被阻塞. block 操作符通常用来把响应式的数据流转换成传统的数据流.
例如, 使用如下方法时, 我们分别将 Flux 数据流和 Mono 数据流转变成了普通的 List<Order > 对象和单个 Order 对象, 同样也可以设置 block 的等待时间.
- public List<Order> getAllOrder(){
- return orderService.getAllOrders().block(Duration.ofSecond(5));
- }
- public Order getOrderById(Long orderId){
- return orderService.getOrderById(orderId).block(Duration.ofSecond(2));
- }
往期
实战 SpringCloud 响应式微服务系列教程 (第一章)
实战 SpringCloud 响应式微服务系列教程 (第二章)
实战 SpringCloud 响应式微服务系列教程 (第三章)
实战 SpringCloud 响应式微服务系列教程 (第四章)
实战 SpringCloud 响应式微服务系列教程 (第五章)
实战 SpringCloud 响应式微服务系列教程 (第六章)
来源: https://www.cnblogs.com/javazhiyin/p/11660204.html