书接上文 Reactive 编程 , 我们已经了解了基础的 API, 现在我们开始编写实际的应用 Reactive 对并发编程进行了很好的抽象, 也有很多底层的特性需要我们去关注当使用这些特性时, 我们可以对之前隐藏在容器平台框架中的细节进行控制
Spring MVC 由阻塞转向 Reactive
Reactive 要求我们以不同的思路来看待问题区别于传统的 request->response 模式, 所有的数据都是发布为一个序列 (Publisher) 然后进行订阅 (Subscriber) 区别于同步等待返回结果, 改为注册一个回调只要我们习惯了这种方式, 就不会觉得很复杂但是没办法让整个环境都同时变为 Reactive 模式, 所以避免不了要跟老式的阻塞 API 打交道
假设我们有一个返回 HttpStatus 的阻塞方法:
- private RestTemplate restTemplate = new RestTemplate();
- private HttpStatus block(int value) {
- return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
- .getStatusCode();
- }
我们需要传递不同的参数来重复调用这个方法, 并对返回的结果进行处理这是一个典型的 scatter-gather 应用场景例如从多个页面中提取前 N 条数据
这是一个采用错误方式的例子:
- Flux.range(1, 10) (1)
- .log()
- .map(this::block) (2)
- .collect(Result::new, Result::add) (3)
- .doOnSuccess(Result::stop) (4)
调用10 次接口
产生阻塞
将结果进行汇总后放入一个对象
最后结束处理 (结果是一个 Mono<Result>)
不要采用这种方式来编写代码这是一种错误的实现方式, 这样会阻塞住调用线程, 这跟循环调用 block()没什么区别好的实现应该是把 block()的调用放到工作线程中我们可以采用一个返回 Mono<HttpStatus > 的方法:
- private Mono<HttpStatus> fetch(int value) {
- return Mono.fromCallable(() -> block(value)) (1)
- .subscribeOn(this.scheduler); (2)
- }
将阻塞调用放到一个 ```Callable`` 中
在工作线程中进行订阅
scheduler 单独定义为一个共享变量:
Scheduler scheduler = Schedulers.parallel()
然后用 flatMap() 代替 map()
- Flux.range(1, 10)
- .log()
- .flatMap( (1)
- this::fetch, 4) (2)
- .collect(Result::new, Result::add)
- .doOnSuccess(Result::stop)
在新的 publisher 中并行处理
flatMap 的并行参数
嵌入 Non-Reactive 服务
如果想将上面的代码放入一个 servlet 这种的 Non-Reactive 的服务中, 可以使用 Spring MVC:
- @RequestMapping("/parallel")
- public CompletableFuture<Result> parallel() {
- return Flux.range(1, 10)
- ...
- .doOnSuccess(Result::stop)
- .toFuture();
- }
在阅读了 @RequestMapping 的 javadoc 以后, 我们会发现这个方法会返回一个 CompletableFuture , 应用会选择在单独的线程中返回值在我们的例子中这个单独的线程由 scheduler 提供
没有免费的午餐
利用工作线程进行 scatter-gather 计算是一个好的模式, 但是也不完美 - 没有阻塞调用方, 但还是阻塞了一些东西, 只不过是把问题转移了我们有一个非阻塞 IO 的 HTTP 服务, 将处理放入线程池, 一个请求一个线程 - 这是 servlet 容器的机制 (例如 tomcat) 请求是异步处理的, 因此 tomcat 内部的工作线程没有被阻塞, 我们的 scheduler 会创建 4 个线程在处理 10 个请求时, 理论上处理性能会提高 4 倍简单来说, 如果我们在单个线程中顺序处理 10 个请求要花 1000ms, 我们所采用的方式只需 250ms
我们可以通过增加线程来进一步提升性能(分配 16 个线程):
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
Tomcat 默认会分配 100 个线程来处理请求, 当所有的请求同时处理时, 我们的 scheduler 线程池会成为一个瓶颈我们的 scheduler 线程池数量远小于 Tomcat 的线程池数量这说明性能调优不是一个简单的事情, 需要考虑各个参数和资源的匹配情况
相比固定数量的线程池, 我们可以采用更灵活的线程池, 可以根据需要动态调整线程数量 Reactor 已经提供了这种机制, 使用
Schedulers.elastic()
后可以看到当请求增多时, 线程数量会随之增加
全面采用 Reactive
从阻塞调用到 reactive 的桥接是一种有效的模式, 并且用 Spring MVC 的技术很容易实现接下来我们将完全弃用阻塞模式, 采用新的 API 和新的工具最终我们实现全栈 Reactive
在我们的例子中, 第一步先用
spring-boot-starter-web-reactive
替换
- spring-boot-starter-web
- :
- Maven:
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot.experimental</groupId>
- <artifactId>spring-boot-starter-web-reactive</artifactId>
- </dependency>
- ...
- </dependencies>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot.experimental</groupId>
- <artifactId>spring-boot-dependencies-web-reactive</artifactId>
- <version>0.1.0.M1</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- Gradle:
- dependencies {
- compile('org.springframework.boot.experimental:spring-boot-starter-web-reactive')
- ...
- }
- dependencyManagement {
- imports {
- mavenBom "org.springframework.boot.experimental:spring-boot-dependencies-web-reactive:0.1.0.M1"
- }
- }
在 controller 中, 不再使用 CompletableFuture 代之以返回一个 Mono :
- @RequestMapping("/parallel")
- public Mono<Result> parallel() {
- return Flux.range(1, 10)
- .log()
- .flatMap(this::fetch, 4)
- .collect(Result::new, Result::add)
- .doOnSuccess(Result::stop);
- }
将这段代码放到 SpringBoot 应用中, 可以运行在 Tomcat, Jetty 或者 Netty, 取决于 classpath 引入了哪个包 Tomcat 是默认的容器, 如果想用别的容器, 需要把 Tomcat 从 classpath 中去掉, 然后引入其它的容器这 3 个容器在启动时间内存使用和运行时资源上相差不大
我们仍然调用 block() 阻塞服务接口, 所以我们仍需在工作线程中订阅以免阻塞调用方我们也可以采用一个非阻塞的客户端, 例如用新的 WebClient 替换 RestTemplate :
- private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
- private Mono<HttpStatus> fetch(int value) {
- return this.client.perform(HttpRequestBuilders.get("http://example.com"))
- .extract(WebResponseExtractors.response(String.class))
- .map(response -> response.getStatusCode());
- }
注意
WebClient.perform()
的返回值是一个转换为 Mono<HttpStatus> 的 Reactive 类型, 但是我们没有订阅它订阅的工作由框架来完成
控制反转
现在我们去掉 fetch() 调用之后的并发参数:
- @RequestMapping("/netty")
- public Mono<Result> netty() {
- return Flux.range(1, 10) (1)
- .log() //
- .flatMap(this::fetch) (2)
- .collect(Result::new, Result::add)
- .doOnSuccess(Result::stop);
- }
进行 10 次调用
在新的 publisher 中并行处理
由于不再使用额外的订阅线程, 相比于阻塞与 Reactive 桥接模式中的代码简洁了很多, 现在是全面 Reactive 模式了 WebClient 返回一个 Mono , 在然而然的我们需要在转换链中使用 flatMap() 编写这种代码是一个很好的体验, 易于理解便于维护同时不再需要线程池和并发参数, 也没有了影响性能的魔法数字 4 性能取决于系统资源而不是应用的线程控制
应用可以运行在 Tomcat, Jetty 或 Netty 上. Tomcat 和 Jetty 的支持基于 Servlet 3.1 的异步处理, 受限于一个请求一个线程而运行在 Netty 上则没有这个限制只要客户端不阻塞, 会尽快的分发客户端请求由于 Netty 服务不是一个请求一个线程, 因此不会使用大量的线程
注意, 很多应用的阻塞调用不只是 HTTP, 还有数据库操作当前很少的数据库支持非阻塞的客户端 (除了 MongoDB 和 Couchbase) 线程池和 blocking-to-reactive 模式会长期存在
仍然没有免费的午餐
首先, 我们的代码是声明式的, 不方便调试, 错误发生时不容易定位使用原生的 API, 例如不通过 Spring 框架而直接使用 Reactor, 会使情况变的更糟, 因为我们自己要做很多的错误处理, 每次进行网络调用都要写很多样板代码通过组合使用 Spring 和 Reactor 我们可以方便的查看堆栈信息和未捕获的异常由于运行的线程不受我们控制, 因此在理解上会有困难
其次, 一旦编写错误导致一个 Reactive 回调被阻塞, 在同一线程上的所有请求都会挂起在 servlet 容器中, 由于是一个请求一个线程, 一个请求阻塞时, 其它的请求不会受影响而在 Reactive 中, 一个请求被阻塞会导致所有请求的延迟都增加
总结
在异步处理中能够控制所有的环节是非常好的: 每一个层级都有线程池和队列我们可以使一些层级具有弹性能力, 可以根据负载动态调整但是这也是一种负担, 我们期望有更加简洁的方式可扩展性的分析结果趋向于减少多余的线程, 不要超出硬件资源的限制条件
Reactive 不能解决所有问题的方案, 事实上它本身不是一个方案, 它只是促进了某一类问题的解决方案的产生学习的成本程序的调整后续的维护成本可能远大于其所带来的益处所以在是否使用 Reactive 这个问题上要非常谨慎
来源: https://juejin.im/entry/5ab9b2376fb9a028e25d90d6