本文主要基于 Hystrix 1.5.X 版本
1. 概述
- handleFallback
- #handleShortCircuitViaFallback()
- #handleSemaphoreRejectionViaFallback()
- #handleThreadPoolRejectionViaFallback()
- #handleTimeoutViaFallback()
- #handleFailureViaFallback()
- #getFallbackOrThrowException()
666. 彩蛋
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 Hystrix 命令执行 (四) 之失败回退逻辑
建议 : 对 RxJava 已经有一定的了解的基础上阅读本文
Hystrix 执行命令整体流程如下图:
FROM 翻译 Hystrix 文档 - 实现原理流程图
红圈 :Hystrix 命令执行失败, 执行回退逻辑也就是大家经常在文章中看到的服务降级
绿圈 : 四种情况会触发失败回退逻辑( fallback )
第一种 :short-circuit , 处理链路处于熔断的回退逻辑, 在 3. #handleShortCircuitViaFallback() 详细解析
第二种 :semaphore-rejection , 处理信号量获得失败的回退逻辑, 在 4. #handleShortCircuitViaFallback() 详细解析
第三种 :
thread-pool-rejection
, 处理线程池提交任务拒绝的回退逻辑, 在 5. #handleThreadPoolRejectionViaFallback() 详细解析
第四种 :execution-timeout , 处理命令执行超时的回退逻辑, 在 6. #handleTimeoutViaFallback() 详细解析
第五种 :execution-failure , 处理命令执行异常的回退逻辑, 在 7. #handleFailureViaFallback() 详细解析
第六种 :bad-request ,TODO 2014HystrixBadRequestException, 和 hystrix-javanica 子项目相关
另外,#handleXXXX() 方法, 整体代码比较类似, 最终都是调用
#getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 在 8. #getFallbackOrThrowException() 详细解析
推荐 Spring Cloud 书籍:
请支持正版下载盗版, 等于主动编写低级 BUG
程序猿 DD Spring Cloud 微服务实战
周立 Spring Cloud 与 Docker 微服务架构实战
两书齐买, 京东包邮
推荐 Spring Cloud 视频:
Java 微服务实践 - Spring Boot
Java 微服务实践 - Spring Cloud
Java 微服务实践 - Spring Boot / Spring Cloud
2. handleFallback
在 Hystrix 源码解析 命令执行 (一) 之正常执行逻辑 4. #executeCommandAndObserve() 中,
#executeCommandAndObserve(...)
的第 82 行
onErrorResumeNext(handleFallback)
代码, 通过调用
Observable#onErrorResumeNext(...)
方法, 实现执行命令 Observable 执行异常时, 返回回退逻辑 Observable, 执行失败回退逻辑
FROM ReactiveX 文档中文翻译 onErrorResumeNext
onErrorResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable , 后者会忽略前者的 onError 调用, 不会将错误传递给观察者, 作为替代, 它会开始镜像另一个, 备用的 Observable
- Javadoc: onErrorResumeNext(Func1))
- Javadoc: onErrorResumeNext(Observable))
handleFallback 变量, 代码如下 :
- final Func1> handleFallback = new Func1>() {
- @Override
- public Observable call(Throwable t) {
- // 标记尝试成功
- circuitBreaker.markNonSuccess();
- // 标记 executionResult 执行异常
- Exception e = getExceptionFromThrowable(t);
- executionResult = executionResult.setExecutionException(e);
- // 返回 回退逻辑 Observable
- if (e instanceof RejectedExecutionException) { // 线程池提交任务拒绝异常
- return handleThreadPoolRejectionViaFallback(e);
- } else if (t instanceof HystrixTimeoutException) { // 执行命令超时异常
- return handleTimeoutViaFallback();
- } else if (t instanceof HystrixBadRequestException) { // TODO 2014HystrixBadRequestException
- return handleBadRequestByEmittingError(e);
- } else {
- /*
- * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
- */
- if (e instanceof HystrixBadRequestException) { // TODO 2014HystrixBadRequestException
- eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
- return Observable.error(e);
- }
- return handleFailureViaFallback(e);
- }
- }
- };
第 5 行 : 标记断路器尝试成功在 Hystrix 源码解析 断路器 HystrixCircuitBreaker 有详细解析
第 7 至 8 行 : 标记 executionResult 执行异常
第 10 至 11 行 :
thread-pool-rejection
, 处理线程池提交任务拒绝的回退逻辑, 在 5. #handleThreadPoolRejectionViaFallback() 详细解析
第 12 至 13 行 :execution-timeout , 处理命令执行超时的回退逻辑, 在 6. #handleTimeoutViaFallback() 详细解析
第 14 至 23 行 :,bad-request ,TODO 2014HystrixBadRequestException, 和 hystrix-javanica 子项目相关
第 25 行 :execution-failure 处理命令执行异常的回退逻辑, 在 7. #handleFailureViaFallback() 详细解析
- 3. #handleShortCircuitViaFallback()
- #handleShortCircuitViaFallback()
方法, short-circuit , 处理链路处于熔断的回退逻辑, 在 此处 被调用, 代码如下 :
- private Observable handleShortCircuitViaFallback() {
- // TODO 2011Hystrix 事件机制
- // record that we are returning a short-circuited fallback
- eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
- // 标记 executionResult 执行异常
- // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
- Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
- executionResult = executionResult.setExecutionException(shortCircuitException);
- try {
- // 获得 回退逻辑 Observable 或者 异常 Observable
- return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
- "short-circuited", shortCircuitException);
- } catch (Exception e) {
- return Observable.error(e);
- }
- }
第 4 行 :TODO 2011Hystrix 事件机制
第 7 至 8 行 : 标记 executionResult 执行异常
第 11 至 12 行 : 调用
#getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 在 8. #getFallbackOrThrowException() 详细解析
第 14 行 : 返回异常 Observable
- 4. #handleSemaphoreRejectionViaFallback()
- #handleSemaphoreRejectionViaFallback()
方法, semaphore-rejection , 处理信号量获得失败的回退逻辑, 在 此处 被调用, 代码如下 :
- private Observable handleSemaphoreRejectionViaFallback() {
- // 标记 executionResult 执行异常
- Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
- executionResult = executionResult.setExecutionException(semaphoreRejectionException);
- // TODO 2011Hystrix 事件机制
- eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
- logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
- // retrieve a fallback or throw an exception if no fallback available
- // 获得 回退逻辑 Observable 或者 异常 Observable
- return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
- "could not acquire a semaphore for execution", semaphoreRejectionException);
- }
第 3 至 4 行 : 标记 executionResult 执行异常
第 6 至 7 行 :TODO 2011Hystrix 事件机制
第 10 至 11 行 : 调用
#getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 在 8. #getFallbackOrThrowException() 详细解析
- 5. #handleThreadPoolRejectionViaFallback()
- #handleThreadPoolRejectionViaFallback()
方法,
thread-pool-rejection
, 处理线程池提交任务拒绝的回退逻辑, 在 此处 被调用, 代码如下:
- private Observable handleThreadPoolRejectionViaFallback(Exception underlying) {
- // TODO 2011Hystrix 事件机制
- eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
- // TODO 2002metrics
- threadPool.markThreadRejection();
- // 获得 回退逻辑 Observable 或者 异常 Observable
- // use a fallback instead (or throw exception if not implemented)
- return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
- "could not be queued for execution", underlying);
- }
第 3 行 :TODO 2011Hystrix 事件机制
第 5 行 :TODO 2002metrics
第 8 至 9 行 : 调用
#getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 在 8. #getFallbackOrThrowException() 详细解析
- 6. #handleTimeoutViaFallback()
- #handleTimeoutViaFallback()
方法, execution-timeout , 处理命令执行超时的回退逻辑, 在 此处 被调用, 代码如下:
- private Observable handleTimeoutViaFallback() {
- // 获得 回退逻辑 Observable 或者 异常 Observable
- return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
- "timed-out", new TimeoutException());
- }
第 3 至 4 行 : 调用
#getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 在 8. #getFallbackOrThrowException() 详细解析
- 7. #handleFailureViaFallback()
- #handleFailureViaFallback()
方法, execution-failure , 处理命令执行异常的回退逻辑, 在 此处 被调用, 代码如下:
- private Observable handleFailureViaFallback(Exception underlying) {
- // TODO 2011Hystrix 事件机制
- /**
- * All other error handling
- */
- logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
- // report failure
- eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
- // 标记 executionResult 异常 TODO 2007executionResult 用途 为啥不是执行异常
- // record the exception
- executionResult = executionResult.setException(underlying);
- // 获得 回退逻辑 Observable 或者 异常 Observable
- return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
- }
第 2 至 9 行 :TODO 2011Hystrix 事件机制
第 13 行 : 标记 executionResult 异常
第 15 行 : 调用
#getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 在 8. #getFallbackOrThrowException() 详细解析
- 8. #getFallbackOrThrowException()
- #getFallbackOrThrowException()
方法, 获得回退逻辑 Observable 或者异常 Observable, 代码如下 :
- private Observable getFallbackOrThrowException(final AbstractCommand _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
- // 记录 HystrixRequestContext
- final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
- // 标记 executionResult 添加 ( 记录 ) 事件
- long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
- // record the executionResult
- // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
- executionResult = executionResult.addEvent((int) latency, eventType);
- if (isUnrecoverable(originalException)) { // 无法恢复的异常
- logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback.", originalException);
- // TODO 2003HOOK
- /* executionHook for all errors */
- Exception e = wrapWithOnErrorHook(failureType, originalException);
- // 返回 异常 Observable
- return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + "" + message +" and encountered unrecoverable error.", e, null));
- } else {
- if (isRecoverableError(originalException)) { // 可恢复的异常
- logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
- }
- if (properties.fallbackEnabled().get()) {
- /* fallback behavior is permitted so attempt *$/ 设置 HystrixRequestContext 的 Action
- final Action1> setRequestContext = new Action1>() {
- @Override
- public void call(Notification rNotification) {
- setRequestContextIfNeeded(requestContext);
- }
- };
- // TODO 2007executionResult 用途
- final Action1 markFallbackEmit = new Action1() {
- @Override
- public void call(R r) {
- if (shouldOutputOnNextEvents()) {
- executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
- eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
- }
- }
- };
- // TODO 2007executionResult 用途
- final Action0 markFallbackCompleted = new Action0() {
- @Override
- public void call() {
- long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
- eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
- executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
- }
- };
- // 处理异常 的 Func
- final Func1> handleFallbackError = new Func1>() {
- @Override
- public Observable call(Throwable t) {
- // TODO 2003HOOK
- /* executionHook for all errors */
- Exception e = wrapWithOnErrorHook(failureType, originalException);
- // 获得 Exception
- Exception fe = getExceptionFromThrowable(t);
- long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
- Exception toEmit;
- if (fe instanceof UnsupportedOperationException) {
- // TODO 2011Hystrix 事件机制
- logger.debug("No fallback for HystrixCommand.", fe); // debug only since we're throwing the exception and someone higher will do something with it
- eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
- // 标记 executionResult 添加 ( 记录 ) 事件 HystrixEventType.FALLBACK_MISSING
- executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
- // 创建 HystrixRuntimeException
- toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message +" and no fallback available.", e, fe);
- } else {
- // TODO 2011Hystrix 事件机制
- logger.debug("HystrixCommand execution" + failureType.name() + "and fallback failed.", fe);
- eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
- // 标记 executionResult 添加 ( 记录 ) 事件 HystrixEventType.FALLBACK_FAILURE
- executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
- // 创建 HystrixRuntimeException
- toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message +" and fallback failed.", e, fe);
- }
- // NOTE: we're suppressing fallback exception here
- if (shouldNotBeWrapped(originalException)) {
- return Observable.error(e);
- }
- return Observable.error(toEmit);
- }
- };
- // 获得 TryableSemaphore
- final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
- // 信号量释放 Action
- final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
- final Action0 singleSemaphoreRelease = new Action0() {
- @Override
- public void call() {
- if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
- fallbackSemaphore.release();
- }
- }
- };
- Observable fallbackExecutionChain;
- // acquire a permit
- if (fallbackSemaphore.tryAcquire()) {
- try {
- if (isFallbackUserDefined()) {
- executionHook.onFallbackStart(this);
- fallbackExecutionChain = getFallbackObservable();
- } else {
- //same logic as above without the hook invocation
- fallbackExecutionChain = getFallbackObservable();
- }
- } catch (Throwable ex) {
- //If hook or user-fallback throws, then use that as the result of the fallback lookup
- fallbackExecutionChain = Observable.error(ex);
- }
- // 获得 回退逻辑 Observable
- return fallbackExecutionChain
- .doOnEach(setRequestContext)
- .lift(new FallbackHookApplication(_cmd)) // TODO 2003HOOK
- .lift(new DeprecatedOnFallbackHookApplication(_cmd))
- .doOnNext(markFallbackEmit)
- .doOnCompleted(markFallbackCompleted)
- .onErrorResumeNext(handleFallbackError) //
- .doOnTerminate(singleSemaphoreRelease)
- .doOnUnsubscribe(singleSemaphoreRelease);
- } else {
- return handleFallbackRejectionByEmittingError();
- }
- } else {
- return handleFallbackDisabledByEmittingError(originalException, failureType, message);
- }
- }
- }
耐心, 这个方法看起来灰常长, 也仅限于长, 理解成难度很小
第 3 行 : 记录 HystrixRequestContext
第 5 至 8 行 : 标记 executionResult 添加 ( 记录 ) 事件
第 10 至 17 行 : 调用
#isUnrecoverable(Exception)
方法, 若异常不可恢复, 直接返回异常 Observable 点击 链接 查看该方法
第 19 至 21 行 : 调用
#isRecoverableError(Exception)
方法, 若异常可恢复, 打印 WARN 日志点击 链接 查看该方法主要针对 java.lang.Error 情况, 打印
#isUnrecoverable(Exception)
排除掉的 Error
反向第 141 至 143 行 : 当配置
HystrixCommandProperties.fallbackEnabled = false
( 默认值 :true ) , 即失败回退功能关闭, 调用
#handleFallbackDisabledByEmittingError()
, 返回异常 Observable 点击 链接 查看该方法
反向第 138 至 140 行 : 失败回退信号量 ( TryableSemaphore ) 注意, 不是正常执行信号量使用失败, 调用
#handleFallbackRejectionByEmittingError()
, 返回异常 Observable 点击 链接 查看该方法
第 23 行 : 当配置
HystrixCommandProperties.fallbackEnabled = true
( 默认值 :true ) , 即失败回退功能开启
第 27 至 32 行 : 设置 HystrixRequestContext 的 Action , 使用第 3 行记录的 HystrixRequestContext
第 35 至 43 行 :TODO 2007executionResult 用途
第 46 至 53 行 :TODO 2007executionResult 用途
第 56 至 95 行 : 处理回退逻辑执行发生异常的 Func1 , 返回异常 Observable
第 61 行 :TODO 2003HOOK
第 63 行 : 调用
#getExceptionFromThrowable(Throwable)
方法, 获得 Exception 若 t 的类型为 Throwable 时, 包装成 Exception 点击 链接 查看该方法代码
第 68 至 76 行 : 当 fe 的类型为 UnsupportedOperationException 时, 使用 e + fe 创建 HystrixRuntimeException 该异常发生于
HystrixCommand#getFallback()
抽象方法未被覆写
第 77 至 86 行 : 当 fe 的类型为其他异常时, 使用 e + fe 创建 HystrixRuntimeException 该异常发生于
HystrixCommand#getFallback()
执行发生异常
第 89 至 91 行 : 调用
#shouldNotBeWrapped()
方法, 判断 originalException 是 ExceptionNotWrappedByHystrix 的实现时, 即要求返回的异常 Observable 不使用 HystrixRuntimeException 包装点击 链接 查看该方法代码
第 93 行 : 返回异常 Observable, 使用 toEmit ( HystrixRuntimeException ) 为异常
第 98 行 : 调用
#getFallbackSemaphore()
方法, 获得失败回退信号量 ( TryableSemaphore ) 对象, 点击 链接 查看该方法代码 TryableSemaphore 在 Hystrix 源码解析 命令执行 (一) 之正常执行逻辑 3. TryableSemaphore 有详细解析
第 100 至 109 行 : 信号量释放的 Action
第 114 至 137 行 : 失败回退信号量 ( TryableSemaphore ) 使用成功, 返回回退逻辑 Observable
重要第 116 至 122 行 : 调用
#getFallbackObservable()
方法, 创建回退逻辑 Observable 将子类对
HystrixCommand#getFallback()
抽象方法的执行结果, 使用
Observable#just(...)
包装返回点击 链接 查看该方法的代码
第 116 行 : 调用
#isFallbackUserDefined()
方法, 返回命令子类是否实现
HystrixCommand#getFallback()
抽象方法只有已实现( true ) 的情况下, 调用 HOOK TODO 2003HOOK
第 129 至 137 行 : 获得 回退逻辑 Observable
第 131 行 :// TODO 2003HOOK
第 135 行 : 调用
Observable#onErrorResumeNext(...)
方法, 实现失败回退 Observable 执行异常时, 返回异常 Observable
有两个注意点:
当命令执行超时时, 失败回退逻辑使用的是 HystrixTimer 的线程池
失败回退逻辑, 无超时时间, 使用要小心
666. 彩蛋
比想象中臭长的逻辑
总的来说, 逻辑和 Hystrix 源码解析 命令执行 (一) 之正常执行逻辑 是很类似的
胖友, 分享一波朋友圈可好!
来源: http://www.suo.im/7qN8X