Resilience4j 是一个轻量级, 易于使用的容错库, 其灵感来自 Netflix Hystrix, 但专为 Java 8 和函数式编程设计. 轻量级, 因为库只使用 Vavr, 它没有任何其他外部库依赖项. 相比之下, Netflix Hystrix 对 Archaius 有一个编译依赖关系, Archaius 有更多的外部库依赖关系, 如 Guava 和 Apache Commons.
Resilience4j 提供高阶函数 (decorators) 来增强任何功能接口, lambda 表达式或方法引用, 包括断路器, 速率限制器, 重试或舱壁. 可以在任何函数接口, lambda 表达式或方法引用上使用多个装饰器. 优点是您可以选择所需的装饰器, 而无需其他任何东西.
有了 Resilience4j, 你不必全力以赴, 你可以选择你需要的.
概览
Resilience4j 提供了两种舱壁模式(Bulkhead), 可用于限制并发执行的次数:
SemaphoreBulkhead(信号量舱壁, 默认), 基于 Java 并发库中的 Semaphore 实现.
FixedThreadPoolBulkhead(固定线程池舱壁), 它使用一个有界队列和一个固定线程池.
本文将演示在 Spring Boot2 中集成 Resilience4j 库, 以及在多并发情况下实现如上两种舱壁模式.
引入依赖
在 Spring Boot2 项目中引入 Resilience4j 相关依赖
- <dependency>
- <groupId>io.GitHub.resilience4j</groupId>
- <artifactId>resilience4j-spring-boot2</artifactId>
- <version>1.4.0</version>
- </dependency>
- <dependency>
- <groupId>io.GitHub.resilience4j</groupId>
- <artifactId>resilience4j-bulkhead</artifactId>
- <version>1.4.0</version>
- </dependency>
由于 Resilience4j 的 Bulkhead 依赖于 Spring AOP, 所以我们需要引入 Spring Boot AOP 相关依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- </dependency>
我们可能还希望了解 Resilience4j 在程序中的运行时状态, 所以需要通过 Spring Boot Actuator 将其暴露出来
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
实现 SemaphoreBulkhead(信号量舱壁)
resilience4j-spring-boot2 实现了对 resilience4j 的自动配置, 因此我们仅需在项目中的 YAML/properties 文件中编写配置即可.
SemaphoreBulkhead 的配置项如下:
属性配置 | 默认值 | 含义 |
---|---|---|
maxConcurrentCalls | 25 | 舱壁允许的最大并行执行量 |
maxWaitDuration | 0 | 尝试进入饱和舱壁时,应阻塞线程的最长时间。 |
添加配置
示例(使用 YAML):
- resilience4j.bulkhead:
- configs:
- default:
- maxConcurrentCalls: 5
- maxWaitDuration: 20ms
- instances:
- backendA:
- baseConfig: default
- backendB:
- maxWaitDuration: 10ms
- maxConcurrentCalls: 20
如上, 我们配置了 SemaphoreBulkhead 的默认配置为 maxConcurrentCalls: 5,maxWaitDuration: 20ms. 并在 backendA 实例上应用了默认配置, 而在 backendB 实例上使用自定义的配置. 这里的实例可以理解为一个方法 / lambda 表达式等等的可执行单元.
编写 Bulkhead 逻辑
定义一个受 SemaphoreBulkhead 管理的 Service 类:
- @Service
- public class BulkheadService {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- @Autowired
- private BulkheadRegistry bulkheadRegistry;
- @Bulkhead(name = "backendA")
- public JsonNode getJsonObject() throws InterruptedException {
- io.GitHub.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadRegistry.bulkhead("backendA").getMetrics();
- logger.info("now i enter the method!!!,{}<<<<<<{}", metrics.getAvailableConcurrentCalls(), metrics.getMaxAllowedConcurrentCalls());
- Thread.sleep(1000L);
- logger.info("now i exist the method!!!");
- return new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis());
- }
- }
如上, 我们将 @Bulkhead 注解放到需要管理的方法上面. 并且通过 name 属性指定该方法对应的 Bulkhead 实例名字(这里我们指定的实例名字为 backendA, 所以该方法将会利用默认的配置).
定义接口类:
- @RestController
- public class BulkheadResource {
- @Autowired
- private BulkheadService bulkheadService;
- @GetMapping("/json-object")
- public ResponseEntity<JsonNode> getJsonObject() throws InterruptedException {
- return ResponseEntity.ok(bulkheadService.getJsonObject());
- }
- }
编写测试:
首先添加测试相关依赖
- <dependency>
- <groupId>io.REST-assured</groupId>
- <artifactId>REST-assured</artifactId>
- <version>3.0.5</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.awaitility</groupId>
- <artifactId>awaitility</artifactId>
- <version>4.0.2</version>
- <scope>test</scope>
- </dependency>
这里我们使用 https://github.com/rest-assured/rest-assured 和 https://github.com/awaitility/awaitility 编写多并发情况下的 API 测试
- public class SemaphoreBulkheadTests extends Resilience4jDemoApplicationTests {
- @LocalServerPort
- private int port;
- @BeforeEach
- public void init() {
- RestAssured.baseURI = "http://localhost";
- RestAssured.port = port;
- }
- @Test
public void 多并发访问情况下的 SemaphoreBulkhead 测试() {
- CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
- IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
- statusList.add(given().get("/json-object").statusCode());
- }
- ));
- await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
- System.out.println(statusList);
- assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(5);
- assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(3);
- }
- }
可以看到所有请求中只有前五个顺利通过了, 其余三个都因为超时而导致接口报 500 异常. 我们可能并不希望这种不友好的提示, 因此 Resilience4j 提供了自定义的失败回退方法. 当请求并发量过大时, 无法正常执行的请求将进入回退方法.
首先我们定义一个回退方法
- private JsonNode fallback(BulkheadFullException exception) {
- return new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis());
- }
注意: 回退方法应该和调用方法放置在同一类中, 并且必须具有相同的方法签名, 并且仅带有一个额外的目标异常参数.
然后在 @Bulkhead 注解中指定回退方法:@Bulkhead(name = "backendA", fallbackMethod = "fallback")
最后修改 API 测试代码:
@Test
public void 多并发访问情况下的 SemaphoreBulkhead 测试使用回退方法() {
- CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
- IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
- statusList.add(given().get("/json-object").statusCode());
- }
- ));
- await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
- System.out.println(statusList);
- assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
- }
运行单元测试, 成功! 可以看到, 我们定义的回退方法, 在请求过量时起作用了.
实现 FixedThreadPoolBulkhead(固定线程池舱壁)
FixedThreadPoolBulkhead 的配置项如下:
配置名称 | 默认值 | 含义 |
---|---|---|
maxThreadPoolSize | Runtime.getRuntime().availableProcessors() | 配置最大线程池大小 |
coreThreadPoolSize | Runtime.getRuntime().availableProcessors() - 1 | 配置核心线程池大小 |
queueCapacity | 100 | 配置队列的容量 |
keepAliveDuration | 20ms | 当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间 |
添加配置
示例(使用 YAML):
- resilience4j.thread-pool-bulkhead:
- configs:
- default:
- maxThreadPoolSize: 4
- coreThreadPoolSize: 2
- queueCapacity: 2
- instances:
- backendA:
- baseConfig: default
- backendB:
- maxThreadPoolSize: 1
- coreThreadPoolSize: 1
- queueCapacity: 1
如上, 我们定义了一段简单的 FixedThreadPoolBulkhead 配置, 我们指定的默认配置为: maxThreadPoolSize: 4,coreThreadPoolSize: 2,queueCapacity: 2, 并且指定了两个实例, 其中 backendA 使用了默认配置而 backendB 使用了自定义的配置.
编写 Bulkhead 逻辑
定义一个受 FixedThreadPoolBulkhead 管理的方法:
- @Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL)
- public CompletableFuture<JsonNode> getJsonObjectByThreadPool() throws InterruptedException {
- io.GitHub.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
- logger.info("now i enter the method!!!,{}", metrics);
- Thread.sleep(1000L);
- logger.info("now i exist the method!!!");
- return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
- }
如上定义和 SemaphoreBulkhead 的方法大同小异, 其中 @Bulkhead 显示指定了 type 的属性为 Bulkhead.Type.THREADPOOL, 表明其方法受 FixedThreadPoolBulkhead 管理. 由于 @Bulkhead 默认的 Bulkhead 是 SemaphoreBulkhead, 所以在未指定 type 的情况下为 SemaphoreBulkhead. 另外, FixedThreadPoolBulkhead 只对 CompletableFuture 方法有效, 所以我们必创建返回 CompletableFuture 类型的方法.
定义接口类方法
- @GetMapping("/json-object-with-threadpool")
- public ResponseEntity<JsonNode> getJsonObjectWithThreadPool() throws InterruptedException, ExecutionException {
- return ResponseEntity.ok(bulkheadService.getJsonObjectByThreadPool().get());
- }
编写测试代码
@Test
public void 多并发访问情况下的 ThreadPoolBulkhead 测试() {
- CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
- IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
- statusList.add(given().get("/json-object-with-threadpool").statusCode());
- }
- ));
- await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
- System.out.println(statusList);
- assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(6);
- assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(2);
- }
测试中我们并行请求了 8 次, 其中 6 次请求成功, 2 次失败. 根据 FixedThreadPoolBulkhead 的默认配置, 最多能容纳 maxThreadPoolSize+queueCapacity 次请求(根据我们上面的配置为 6 次).
同样, 我们可能并不希望这种不友好的提示, 那么我们可以指定回退方法, 在请求无法正常执行时使用回退方法.
- private CompletableFuture<JsonNode> fallbackByThreadPool(BulkheadFullException exception) {
- return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis()));
- }
- @Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fallbackByThreadPool")
- public CompletableFuture<JsonNode> getJsonObjectByThreadPoolWithFallback() throws InterruptedException {
- io.GitHub.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
- logger.info("now i enter the method!!!,{}", metrics);
- Thread.sleep(1000L);
- logger.info("now i exist the method!!!");
- return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
- }
编写测试代码
@Test
public void 多并发访问情况下的 ThreadPoolBulkhead 测试使用回退方法() {
- CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
- IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
- statusList.add(given().get("/json-object-by-threadpool-with-fallback").statusCode());
- }
- ));
- await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
- System.out.println(statusList);
- assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
- }
由于指定了回退方法, 所有请求的响应状态都为正常了.
总结
本文首先简单介绍了 Resilience4j 的功能及使用场景, 然后具体介绍了 Resilience4j 中的 Bulkhead. 演示了如何在 Spring Boot2 项目中引入 Resilience4j 库, 使用代码示例演示了如何在 Spring Boot2 项目中实现 Resilience4j 中的两种 Bulkhead(SemaphoreBulkhead 和 FixedThreadPoolBulkhead), 并编写 API 测试验证我们的示例.
本文示例代码地址: https://github.com/cg837718548/resilience4j-demo
欢迎访问笔者博客: blog.dongxishaonian.tech
关注笔者公众号, 推送各类原创 / 优质技术文章
来源: https://www.cnblogs.com/dongxishaonian/p/13191230.html