在现在的微服务使用的过程中, 经常会遇到依赖的服务不可用, 那么如果依赖的服务不可用的话, 会导致把自己的服务也会拖死, 那么就产生了熔断, 熔断顾名思义就是当服务处于不可用的时候采取半开关的状态, 达到一定数量后就熔断器就打开. 这就相当于家里边的保险丝, 如果电压过高的话, 保险丝就会断掉, 起到保护电器的作用.
目前支持熔断, 降级的就是 Hystrix, 当然还有 resilience4j 还有 Sentinel. 今天咱们以 Hystrix 为主吧. 其他的大家可以自行研究.
Hystrix 主要实现三个功能, 接下来咱们继续展开.
1. 资源隔离
2. 熔断
3. 降级
资源隔离分为两种, 一种是线程池隔离, 一种是信号量 semaphore 隔离. 线程池以请求的线程和执行的线程分为不同的线程执行, 信号量是请求和执行采用相同的线程.
当然, 涉及到线程池的话, 那么就支持异步, 支持异步 Future 的话也就支持 get 的时候支持超时获取. 信号量这些功能不支持, 但是信号量是支持熔断, 限流. 他们的区别如下:
线程切换 | 异步 | 超时 | 熔断 | 限流 | 资源开销 | |
线程池 | 是 | 是 | 是 | 是 | 是 | 大 |
信号量 | 否 | 否 | 否 | 是 | 是 | 小 |
HystrixCommand 的命令执行大致如下图:
依赖的 pom 如下:
- <!-- 依赖版本 -->
- <hystrix.version>1.3.16</hystrix.version>
- <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
- <dependency>
- <groupId>com.netflix.hystrix</groupId>
- <artifactId>hystrix-core</artifactId>
- <version>${hystrix.version}</version>
- </dependency>
- <dependency>
- <groupId>com.netflix.hystrix</groupId>
- <artifactId>hystrix-metrics-event-stream</artifactId>
- <version>${hystrix-metrics-event-stream.version}</version>
- </dependency>
支持同步, 异步, 观察事件拦截, 以及订阅方式, 下面咱们直接看代码实现吧. 大家一看就明白了:
- import com.netflix.hystrix.HystrixCommand;
- import com.netflix.hystrix.HystrixCommandGroupKey;
- import rx.Observable;
- import rx.Subscriber;
- import rx.functions.Action1;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- /**
- * @author huangqingshi
- * @Date 2019-03-17
- */
- public class HelloWorldCommand extends HystrixCommand<String> {
- private final String name;
- public HelloWorldCommand(String name) {
- // 指定命令组名
- super(HystrixCommandGroupKey.Factory.asKey("myGroup"));
- this.name = name;
- }
- @Override
- protected String run() throws Exception {
- // 逻辑封装在 run 里边
- return "Hello:" + name + "thread:" + Thread.currentThread().getName();
- }
- public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
- // 每个 Command 只能调用一次, 不能重复调用. 重复调用会报异常.
- HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
- //execute 同步调用 等同于: helloWorldCommand.queue().get();
- String result = helloWorldCommand.execute();
- System.out.println("result:" + result);
- helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
- // 异步调用
- Future<String> future = helloWorldCommand.queue();
- //get 可以指定获取的时间 100 毫秒, 默认为 1 秒
- result = future.get(100, TimeUnit.MILLISECONDS);
- System.out.println("result:" + result);
- System.out.println("main thread:" + Thread.currentThread().getName());
- testObserve();
- }
- public static void testObserve() {
- // 注册观察者事件拦截
- Observable<String> observable = new HelloWorldCommand("observe").observe();
- // 注册回调事件
- observable.subscribe(new Action1<String>() {
- @Override
- public void call(String result) {
- //result 就是调用 HelloWorldCommand 的结果
- System.out.println("callback:" + result);
- }
- });
- // 注册完成版的事件
- observable.subscribe(new Subscriber<String>() {
- @Override
- public void onCompleted() {
- System.out.println("onCompleted 调用: onNext : onError 之后调用");
- }
- @Override
- public void onError(Throwable throwable) {
- // 异常产生了之后会调用
- System.out.println("onError:" + throwable.getMessage());
- }
- @Override
- public void onNext(String s) {
- // 获取结果后回调
- System.out.println("onNext:" + s);
- }
- });
- }
- }
执行结果如下:
- result:Hello:Synchronous-hystrix thread:hystrix-myGroup-1
- result:Hello:Asynchronous-hystrix thread:hystrix-myGroup-2
- main thread:main
- callback:Hello:observe thread:hystrix-myGroup-3
- onNext:Hello:observe thread:hystrix-myGroup-3
onCompleted 调用: onNext : onError 之后调用
接下来是线程池隔离的例子:
- import com.netflix.hystrix.*;
- /**
- * @author huangqingshi
- * @Date 2019-03-17
- */
- public class ThreadPoolCommand extends HystrixCommand<String> {
- private String name;
- public ThreadPoolCommand(String name) {
- super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
- .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
- .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
- .withCircuitBreakerRequestVolumeThreshold(10) // 至少 10 个请求, 熔断器才进行错误计算 默认值 20
- .withCircuitBreakerSleepWindowInMilliseconds(5000) // 熔断终端 5 秒后会进入半打开状态
- .withCircuitBreakerErrorThresholdPercentage(50) // 错误率达到 50 开启熔断保护
- .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
- //10 个核心线程
- ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
- );
- this.name = name;
- }
- @Override
- protected String run() throws Exception {
- return "threadPoolCommand name:" + name;
- }
- public static void main(String[] args) {
- ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
- String result = threadPoolCommand.execute();
- System.out.println("result:" + result);
- }
- }
执行结果:
result:threadPoolCommand name:threadPool
信号量隔离例子:
- /**
- * @author huangqingshi
- * @Date 2019-03-17
- */
- public class SemaphoreCommand extends HystrixCommand<String> {
- private String name;
- public SemaphoreCommand(String name) {
- super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("semaphoreGroup"))
- .andCommandKey(HystrixCommandKey.Factory.asKey("semaphoreCommand"))
- .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
- // 至少 10 个请求, 熔断器才会进行错误率的计算 默认值 20
- .withCircuitBreakerRequestVolumeThreshold(10)
- // 熔断器中断请求 5 秒后会自动进入半打开状态, 放部分流量进行重试 默认值 5000ms
- .withCircuitBreakerSleepWindowInMilliseconds(5000)
- // 错误率达到 50 开启熔断保护
- .withCircuitBreakerErrorThresholdPercentage(50)
- // 设置隔离策略
- .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
- // 最大并发量 10
- .withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
- )
- );
- this.name = name;
- }
- @Override
- protected String run() throws Exception {
- return "semaphore success name:" + name;
- }
- @Override
- protected String getFallback() {
- return "semaphore fallback name:" + name;
- }
- public static void main(String[] args) {
- SemaphoreCommand semaphoreCommand = new SemaphoreCommand("semaphoreCommand");
- String result = semaphoreCommand.execute();
- System.out.println(result);
- }
- }
执行结果:
semaphore success name:semaphoreCommand
在执行的过程中, 如果出现调用服务的时候出现错误的时候会先进行熔断, 就是如果流量达到设置的量的时候进行统计, 比如 10 个请求, 然后如果出现错误率超过配置的错误率就会进行将熔断进行打开, 打开之后会进行调用降级方法 fallback. 过了一段时间后, 可以放行部分流量, 如果流量正常了, 则会将熔断器开关关闭. 下图是来自官方文档截图, 里边维护者一个 bucket, 每秒一个 bucket, 里边记录着成功, 失败, 超时, 拒绝. 这个周期是通过 withCircuitBreakerSleepWindowInMilliseconds 配置的.
接下来咱们看一下降级, 也就是熔断器打开的时候, 会走 fallback 方法, 继续看例子.
- import com.netflix.hystrix.*;
- /**
- * @author huangqingshi
- * @Date 2019-03-17
- */
- public class ThreadPoolCommand extends HystrixCommand<String> {
- private String name;
- public ThreadPoolCommand(String name) {
- super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
- .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
- .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
- .withCircuitBreakerRequestVolumeThreshold(10) // 至少 10 个请求, 熔断器才进行错误计算 默认值 20
- .withCircuitBreakerSleepWindowInMilliseconds(5000) // 熔断终端 5 秒后会进入半打开状态
- .withCircuitBreakerErrorThresholdPercentage(50) // 错误率达到 50 开启熔断保护
- .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
- //10 个核心线程
- ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
- );
- this.name = name;
- }
- @Override
- protected String run() throws Exception {
- return "threadPoolCommand name:" + name;
- }
- public static void main(String[] args) {
- ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
- String result = threadPoolCommand.execute();
- System.out.println("result:" + result);
- }
- }
执行结果:
result:executed fallback
并且抛出超时异常. 因为程序故意设计超时的.
当然, Hystrixcommand 还支持 primary 或 secondary 的方式, 可以先看看流程图:
是否执行 primary 是通过参数 primarySecondary.userPrimary 为 true 时执行. false 的时候执行 secondary 方式.
/** * @author huangqingshi * @Date 2019-03-18 */ public class PrimarySecondaryFacade extends HystrixCommand<String> { private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance(). getBooleanProperty("primarySecondary.usePrimary", true); private int id; public PrimarySecondaryFacade(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("primarySecondCommand")) // 此处采用信号量, primary,secondary 采用线程池 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy( HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) ) ); this.id = id; } @Override protected String run() throws Exception { if(usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand<String> { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter(). withExecutionIsolationThreadTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand<String> { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter(). withExecutionIsolationThreadTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-100", new PrimarySecondaryFacade(100).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-100", new PrimarySecondaryFacade(100).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
来源: https://www.cnblogs.com/huangqingshi/p/10555828.html