讲完 metrics 我们就来了解一下熔断器的执行情况, 熔断器的判断取决 metrics 数据.
hystrix 在执行命令前需要经过熔断器判断, 如果服务被熔断, 则执行 fallback 流程, 熔断判断逻辑如下:
如果强制未开启, 返回 true(未熔断).
如果强制开启, 返回 false(熔断).
判断熔断标识
如果未熔断则返回 true.
如果 half_open, 返回 false(熔断).
如果熔断, 判断当前时间是否超过短路窗口期,
如果没有超过, 返回 false.
如果超过则返回 true. 并设置熔断状态为 half_open.
命令执行失败后逻辑如下:
如果熔断标识为 half_open, 并重新计算短路窗口期 (记录当前时间).
如果熔断标识为 close, 通过命令 metric 组件, 获取命令指定窗口时间内执行总错误数和错误率. 如果实际错误率或错误数高于配置错误率或错误数, 则设置熔断标识为熔断.
命令执行成功后逻辑如下:
只有在熔断状态为 half_open 状态下, 才能解除熔断.
如果请求执行成功, 解除熔断.
熔断器还会监听 metrics 数据流, 当错误比率或者请求量大于配置的值时, 就会设置熔断标识为熔断. 每个 commandkey 都会对应一个熔断器.
熔断器判断
- private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
- ...
- if (circuitBreaker.allowRequest()) {
- ...
- return executeCommandAndObserve(_cmd)
- .doOnError(markExceptionThrown)
- .doOnTerminate(singleSemaphoreRelease)
- .doOnUnsubscribe(singleSemaphoreRelease);
- ...
- } else {
- return handleShortCircuitViaFallback();
- }
- }
熔断器逻辑
- static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
- private final HystrixCommandProperties properties;
- private final HystrixCommandMetrics metrics;
- /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
- private AtomicBoolean circuitOpen = new AtomicBoolean(false);
- /* when the circuit was marked open or was last allowed to try a 'singleTest' */
- private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
- protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
- this.properties = properties;
- this.metrics = metrics;
- }
- public void markSuccess() {
- if (circuitOpen.get()) {
- if (circuitOpen.compareAndSet(true, false)) {
- //win the thread race to reset metrics
- //Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
- //and all other metric consumers are unaffected by the reset
- metrics.resetStream();
- }
- }
- }
- @Override
- public boolean allowRequest() {
- if (properties.circuitBreakerForceOpen().get()) {
- // properties have asked us to force the circuit open so we will allow NO requests
- return false;
- }
- if (properties.circuitBreakerForceClosed().get()) {
- // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
- isOpen();
- // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
- return true;
- }
- return !isOpen() || allowSingleTest();
- }
- public boolean allowSingleTest() {
- long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
- // 1) if the circuit is open
- // 2) and it's been longer than'sleepWindow' since we opened the circuit
- if (circuitOpen.get() && System.currentTimeMillis()> timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
- // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
- // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
- if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
- // if this returns true that means we set the time so we'll return true to allow the singleTest
- // if it returned false it means another thread raced us and allowed the singleTest before we did
- return true;
- }
- }
- return false;
- }
- @Override
- public boolean isOpen() {
- if (circuitOpen.get()) {
- // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
- return true;
- }
- // we're closed, so let's see if errors have made us so we should trip the circuit open
- HealthCounts health = metrics.getHealthCounts();
- // check if we are past the statisticalWindowVolumeThreshold
- if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
- // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
- return false;
- }
- if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
- return false;
- } else {
- // our failure rate is too high, trip the circuit
- if (circuitOpen.compareAndSet(false, true)) {
- // if the previousValue was false then we want to set the currentTime
- circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
- return true;
- } else {
- // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
- // caused another thread to set it to true already even though we were in the process of doing the same
- // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
- return true;
- }
- }
- }
- }
来源: http://www.bubuko.com/infodetail-2865453.html