目录
重试的使用场景
如何优雅地设计重试实现
guava-retrying 基础用法
guava-retrying 实现原理
guava-retrying 高级用法
使用中遇到的问题
Guava 版本冲突
动态调节重试策略
重试的使用场景
在很多业务场景中, 为了排除系统中的各种不稳定因素, 以及逻辑上的错误, 并最大概率保证获得预期的结果, 重试机制都是必不可少的.
尤其是调用远程服务, 在高并发场景下, 很可能因为服务器响应延迟或者网络原因, 造成我们得不到想要的结果, 或者根本得不到响应. 这个时候, 一个优雅的重试调用机制, 可以让我们更大概率保证得到预期的响应.
通常情况下, 我们会通过定时任务进行重试. 例如某次操作失败, 则记录下来, 当定时任务再次启动, 则将数据放到定时任务的方法中, 重新跑一遍. 最终直至得到想要的结果为止.
无论是基于定时任务的重试机制, 还是我们自己写的简单的重试器, 缺点都是重试的机制太单一, 而且实现起来不优雅.
如何优雅地设计重试实现
一个完备的重试实现, 要很好地解决如下问题:
什么条件下重试
什么条件下停止
如何停止重试
停止重试等待多久
如何等待
请求时间限制
如何结束
如何监听整个重试过程
并且, 为了更好地封装性, 重试的实现一般分为两步:
使用工厂模式构造重试器
执行重试方法并得到结果
一个完整的重试流程可以简单示意为:
guava-retrying 基础用法
guava-retrying 是基于谷歌的核心类库 guava 的重试机制实现, 可以说是一个重试利器.
下面就快速看一下它的用法.
1.Maven 配置
- <!-- https://mvnrepository.com/artifact/com.github.rholder/guava-retrying -->
- <dependency>
- <groupId>com.GitHub.rholder</groupId>
- <artifactId>guava-retrying</artifactId>
- <version>2.0.0</version>
- </dependency>
需要注意的是, 此版本依赖的是 27.0.1 版本的 guava. 如果你项目中的 guava 低几个版本没问题, 但是低太多就不兼容了. 这个时候你需要升级你项目的 guava 版本, 或者直接去掉你自己的 guava 依赖, 使用 guava-retrying 传递过来的 guava 依赖.
2. 实现 Callable
- Callable<Boolean> callable = new Callable<Boolean>() {
- public Boolean call() throws Exception {
- return true; // do something useful here
- }
- };
Callable 的 call 方法中是你自己实际的业务调用.
通过 RetryerBuilder 构造 Retryer
- Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
- .retryIfResult(Predicates.<Boolean>isNull())
- .retryIfExceptionOfType(IOException.class)
- .retryIfRuntimeException()
- .withStopStrategy(StopStrategies.stopAfterAttempt(3))
- .build();
使用重试器执行你的业务
retryer.call(callable);
下面是完整的参考实现.
- public Boolean test() throws Exception {
- // 定义重试机制
- Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
- //retryIf 重试条件
- .retryIfException()
- .retryIfRuntimeException()
- .retryIfExceptionOfType(Exception.class)
- .retryIfException(Predicates.equalTo(new Exception()))
- .retryIfResult(Predicates.equalTo(false))
- // 等待策略: 每次请求间隔 1s
- .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
- // 停止策略 : 尝试请求 6 次
- .withStopStrategy(StopStrategies.stopAfterAttempt(6))
- // 时间限制 : 某次请求不得超过 2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
- .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
- .build();
- // 定义请求实现
- Callable<Boolean> callable = new Callable<Boolean>() {
- int times = 1;
- @Override
- public Boolean call() throws Exception {
- log.info("call times={}", times);
- times++;
- if (times == 2) {
- throw new NullPointerException();
- } else if (times == 3) {
- throw new Exception();
- } else if (times == 4) {
- throw new RuntimeException();
- } else if (times == 5) {
- return false;
- } else {
- return true;
- }
- }
- };
- // 利用重试器调用请求
- return retryer.call(callable);
- }
guava-retrying 实现原理
guava-retrying 的核心是 Attempt 类, Retryer 类以及一些 Strategy(策略) 相关的类.
Attempt
Attempt 既是一次重试请求 (call), 也是请求的结果, 并记录了当前请求的次数, 是否包含异常和请求的返回值.
- /**
- * An attempt of a call, which resulted either in a result returned by the call,
- * or in a Throwable thrown by the call.
- *
- * @param <V> The type returned by the wrapped callable.
- * @author JB
- */
- public interface Attempt<V>
Retryer
Retryer 通过 RetryerBuilder 这个工厂类进行构造. RetryerBuilder 负责将定义的重试策略赋值到 Retryer 对象中.
在 Retryer 执行 call 方法的时候, 会将这些重试策略一一使用.
下面就看一下 Retryer 的 call 方法的具体实现.
- /**
- * Executes the given callable. If the rejection predicate
- * accepts the attempt, the stop strategy is used to decide if a new attempt
- * must be made. Then the wait strategy is used to decide how much time to sleep
- * and a new attempt is made.
- *
- * @param callable the callable task to be executed
- * @return the computed result of the given callable
- * @throws ExecutionException if the given callable throws an exception, and the
- * rejection predicate considers the attempt as successful. The original exception
- * is wrapped into an ExecutionException.
- * @throws RetryException if all the attempts failed before the stop strategy decided
- * to abort, or the thread was interrupted. Note that if the thread is interrupted,
- * this exception is thrown and the thread's interrupt status is set.
- */
- public V call(Callable<V> callable) throws ExecutionException, RetryException {
- long startTime = System.nanoTime();
- // 说明: 根据 attemptNumber 进行循环 -- 也就是重试多少次
- for (int attemptNumber = 1; ; attemptNumber++) {
- // 说明: 进入方法不等待, 立即执行一次
- Attempt<V> attempt;
- try {
- // 说明: 执行 callable 中的具体业务
- //attemptTimeLimiter 限制了每次尝试等待的时常
- V result = attemptTimeLimiter.call(callable);
- // 利用调用结果构造新的 attempt
- attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
- } catch (Throwable t) {
- attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
- }
- // 说明: 遍历自定义的监听器
- for (RetryListener listener : listeners) {
- listener.onRetry(attempt);
- }
- // 说明: 判断是否满足重试条件, 来决定是否继续等待并进行重试
- if (!rejectionPredicate.apply(attempt)) {
- return attempt.get();
- }
- // 说明: 此时满足停止策略, 因为还没有得到想要的结果, 因此抛出异常
- if (stopStrategy.shouldStop(attempt)) {
- throw new RetryException(attemptNumber, attempt);
- } else {
- // 说明: 执行默认的停止策略 -- 线程休眠
- long sleepTime = waitStrategy.computeSleepTime(attempt);
- try {
- // 说明: 也可以执行定义的停止策略
- blockStrategy.block(sleepTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RetryException(attemptNumber, attempt);
- }
- }
- }
- }
Retryer 执行过程如下.
guava-retrying 高级用法
基于 guava-retrying 的实现原理, 我们可以根据实际业务来确定自己的重试策略.
下面以数据同步这种常规系统业务为例, 自定义重试策略.
如下实现基于 Spring Boot 2.1.2.RELEASE 版本.
并使用 Lombok 简化 Bean.
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
业务描述
当商品创建以后, 需要另外设置商品的价格. 由于两个操作是有两个人进行的, 因此会出现如下问题, 即商品没有创建, 但是价格数据却已经建好了. 遇到这种情况, 价格数据需要等待商品正常创建以后, 继续完成同步.
我们通过一个 http 请求进行商品的创建, 同时通过一个定时器来修改商品的价格.
当商品不存在, 或者商品的数量小于 1 的时候, 商品的价格不能设置. 需要等商品成功创建且数量大于 0 的时候, 才能将商品的价格设置成功.
实现过程
自定义重试阻塞策略
默认的阻塞策略是线程休眠, 这里使用自旋锁实现, 不阻塞线程.
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
- import com.GitHub.rholder.retry.BlockStrategy;
- import lombok.NoArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import java.time.Duration;
- import java.time.LocalDateTime;
- /**
- * 自旋锁的实现, 不响应线程中断
- */
- @Slf4j
- @NoArgsConstructor
- public class SpinBlockStrategy implements BlockStrategy {
- @Override
- public void block(long sleepTime) throws InterruptedException {
- LocalDateTime startTime = LocalDateTime.now();
- long start = System.currentTimeMillis();
- long end = start;
- log.info("[SpinBlockStrategy]...begin wait.");
- while (end - start <= sleepTime) {
- end = System.currentTimeMillis();
- }
- // 使用 Java8 新增的 Duration 计算时间间隔
- Duration duration = Duration.between(startTime, LocalDateTime.now());
- log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
- }
- }
自定义重试监听器
RetryListener 可以监控多次重试过程, 并可以使用 attempt 做一些额外的事情.
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
- import com.GitHub.rholder.retry.Attempt;
- import com.GitHub.rholder.retry.RetryListener;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class RetryLogListener implements RetryListener {
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- // 第几次重试,(注意: 第一次重试其实是第一次调用)
- log.info("retry time : [{}]", attempt.getAttemptNumber());
- // 距离第一次重试的延迟
- log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt());
- // 重试结果: 是异常终止, 还是正常返回
- log.info("hasException={}", attempt.hasException());
- log.info("hasResult={}", attempt.hasResult());
- // 是什么原因导致异常
- if (attempt.hasException()) {
- log.info("causeBy={}" , attempt.getExceptionCause().toString());
- } else {
- // 正常返回时的结果
- log.info("result={}" , attempt.getResult());
- }
- log.info("log listen over.");
- }
- }
自定义 Exception
有些异常需要重试, 有些不需要.
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
- /**
- * 当抛出这个异常的时候, 表示需要重试
- */
- public class NeedRetryException extends Exception {
- public NeedRetryException(String message) {
- super("NeedRetryException can retry."+message);
- }
- }
实现具体重试业务与 Callable 接口
使用 call 方法调用自己的业务.
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import java.math.BigDecimal;
- /**
- * 商品 model
- */
- @Data
- @AllArgsConstructor
- public class Product {
- private Long id;
- private String name;
- private Integer count;
- private BigDecimal price;
- }
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
- import org.springframework.stereotype.Repository;
- import java.math.BigDecimal;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.atomic.AtomicLong;
- /**
- * 商品 DAO
- */
- @Repository
- public class ProductRepository {
- private static ConcurrentHashMap<Long,Product> products=new ConcurrentHashMap();
- private static AtomicLong ids=new AtomicLong(0);
- public List<Product> findAll(){
- return new ArrayList<>(products.values());
- }
- public Product findById(Long id){
- return products.get(id);
- }
- public Product updatePrice(Long id, BigDecimal price){
- Product p=products.get(id);
- if (null==p){
- return p;
- }
- p.setPrice(price);
- return p;
- }
- public Product addProduct(Product product){
- Long id=ids.addAndGet(1);
- product.setId(id);
- products.put(id,product);
- return product;
- }
- }
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
- import lombok.extern.slf4j.Slf4j;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.math.BigDecimal;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.Callable;
- /**
- * 业务方法实现
- */
- @Component
- @Slf4j
- public class ProductInformationHander implements Callable<Boolean> {
- @Autowired
- private ProductRepository pRepo;
- private static Map<Long, BigDecimal> prices = new HashMap<>();
- static {
- prices.put(1L, new BigDecimal(100));
- prices.put(2L, new BigDecimal(200));
- prices.put(3L, new BigDecimal(300));
- prices.put(4L, new BigDecimal(400));
- prices.put(8L, new BigDecimal(800));
- prices.put(9L, new BigDecimal(900));
- }
- @Override
- public Boolean call() throws Exception {
- log.info("sync price begin,prices size={}", prices.size());
- for (Long id : prices.keySet()) {
- Product product = pRepo.findById(id);
- if (null == product) {
- throw new NeedRetryException("can not find product by id=" + id);
- }
- if (null == product.getCount() || product.getCount() <1) {
- throw new NeedRetryException("product count is less than 1, id=" + id);
- }
- Product updatedP = pRepo.updatePrice(id, prices.get(id));
- if (null == updatedP) {
- return false;
- }
- prices.remove(id);
- }
- log.info("sync price over,prices size={}", prices.size());
- return true;
- }
- }
构造重试器 Retryer
将上面的实现作为参数, 构造 Retryer.
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
- import com.GitHub.rholder.retry.*;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy;
- import org.springframework.stereotype.Component;
- import java.util.concurrent.TimeUnit;
- /**
- * 构造重试器
- */
- @Component
- public class ProductRetryerBuilder {
- public Retryer build() {
- // 定义重试机制
- Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
- //retryIf 重试条件
- //.retryIfException()
- //.retryIfRuntimeException()
- //.retryIfExceptionOfType(Exception.class)
- //.retryIfException(Predicates.equalTo(new Exception()))
- //.retryIfResult(Predicates.equalTo(false))
- .retryIfExceptionOfType(NeedRetryException.class)
- // 等待策略: 每次请求间隔 1s
- .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
- // 停止策略 : 尝试请求 3 次
- .withStopStrategy(StopStrategies.stopAfterAttempt(3))
- // 时间限制 : 某次请求不得超过 2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
- .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
- // 默认的阻塞策略: 线程睡眠
- //.withBlockStrategy(BlockStrategies.threadSleepStrategy())
- // 自定义阻塞策略: 自旋锁
- .withBlockStrategy(new SpinBlockStrategy())
- // 自定义重试监听器
- .withRetryListener(new RetryLogListener())
- .build();
- return retryer;
- }
- }
与定时任务结合执行 Retryer
定时任务只需要跑一次, 但是实际上实现了所有的重试策略. 这样大大简化了定时器的设计.
首先使用 @EnableScheduling 声明项目支持定时器注解.
- @SpringBootApplication
- @EnableScheduling
- public class DemoRetryerApplication {
- public static void main(String[] args) {
- SpringApplication.run(DemoRetryerApplication.class, args);
- }
- }
- package.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
- import com.GitHub.rholder.retry.Retryer;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander;
- import.NET.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- /**
- * 商品信息定时器
- */
- @Component
- public class ProductScheduledTasks {
- @Autowired
- private ProductRetryerBuilder builder;
- @Autowired
- private ProductInformationHander hander;
- /**
- * 同步商品价格定时任务
- * @Scheduled(fixedDelay = 30000) : 上一次执行完毕时间点之后 30 秒再执行
- */
- @Scheduled(fixedDelay = 30*1000)
- public void syncPrice() throws Exception{
- Retryer retryer=builder.build();
- retryer.call(hander);
- }
- }
执行结果: 由于并没有商品, 因此重试以后, 抛出异常.
2019 - 二月 - 28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over.
2019 - 二月 - 28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task.
- com.GitHub.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
- at com.GitHub.rholder.retry.Retryer.call(Retryer.java:174)
你也可以增加一些商品数据, 看一下重试成功的效果.
完整示例代码在这里.
使用中遇到的问题
Guava 版本冲突
由于项目中依赖的 guava 版本过低, 启动项目时出现了如下异常.
- java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
- at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41)
- at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207)
- at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346)
- at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87)
- at com.bzn.Web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84)
- at com.bzn.Web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33)
- at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939)
- at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434)
- at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
- at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559)
- at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549)
- at java.util.concurrent.FutureTask.run(FutureTask.java:266)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
- at java.lang.Thread.run(Thread.java:748)
因此, 要排除项目中低版本的 guava 依赖.
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
同时, 由于 Guava 在新版本中移除了 sameThreadExecutor 方法, 但目前项目中的 ZK 需要此方法, 因此需要手动设置合适的 guava 版本.
果然, 在 19.0 版本中 MoreExecutors 的此方法依然存在, 只是标注为过期了.
- @Deprecated
- @GwtIncompatible("TODO")
- public static ListeningExecutorService sameThreadExecutor() {
- return new DirectExecutorService();
- }
声明依赖的 guava 版本改为 19.0 即可.
- <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>19.0</version>
- </dependency>
动态调节重试策略
在实际使用过程中, 有时经常需要调整重试的次数, 等待的时间等重试策略, 因此, 将重试策略的配置参数化保存, 可以动态调节.
例如在秒杀, 双十一购物节等时期增加等待的时间与重试次数, 以保证错峰请求. 在平时, 可以适当减少等待时间和重试次数.
对于系统关键性业务, 如果多次重试步成功, 可以通过 RetryListener 进行监控与报警.
- links:
- https://github.com/rholder/guava-retrying
- https://github.com/spring-projects/spring-retry
- https://zhuanlan.zhihu.com/p/57970758
- https://blog.csdn.net/sinat_26342009/article/details/88045701
- https://github.com/ispringboot/demos/tree/release-retryer-v1/demo-retryer
- author: ijiangtao
来源: https://juejin.im/post/5c77e3bcf265da2d914da410