Sentinel 源码解析系列:
1.Sentinel 源码分析 - FlowRuleManager 加载规则做了什么?
2. Sentinel 源码分析 - Sentinel 是如何进行流量统计的?
3. Sentinel 源码分析 - QPS 流量控制是如何实现的?
4.Sentinel 源码分析 - Sentinel 是如何做到降级的?
这篇文章主要学习一下 Sentinel 如何实现自适应限流的.
为什么要做自适应限流, 官方给了两个理由:
保证系统不被拖垮
在系统稳定的前提下, 保持系统的吞吐量
我再贴一下官方的原理:
能够保证水管里的水量, 能够让水顺畅的流动, 则不会增加排队的请求; 也就是说, 这个时候的系统负载不会进一步恶化.
当保持入口的流量是水管出来的流量的最大的值的时候, 可以最大利用水管的处理能力.
更加具体的原理解释可以看官方: 系统自适应限流
所以看起来好像很厉害的样子, 所以我们来看看具体实现吧.
例子:
设置系统自适应规则
- List<SystemRule> rules = new ArrayList<SystemRule>();
- SystemRule rule = new SystemRule();
- // 限制最大负载
- rule.setHighestSystemLoad(3.0);
- // CPU 负载 60%
- rule.setHighestCpuUsage(0.6);
- // 设置平均响应时间 10 ms
- rule.setAvgRt(10);
- // 设置 qps is 20
- rule.setQps(20);
- // 设置最大线程数 10
- rule.setMaxThread(10);
- rules.add(rule);
- SystemRuleManager.loadRules(Collections.singletonList(rule));
设置限流
- Entry entry = null;
- try {
- entry = SphU.entry("methodA", EntryType.IN);
- //dosomething
- } catch (BlockException e1) {
- block.incrementAndGet();
- //dosomething
- } catch (Exception e2) {
- // biz exception
- } finally {
- if (entry != null) {
- entry.exit();
- }
- }
注意: 系统保护规则是应用整体维度的, 而不是资源维度的, 并且仅对入口流量生效. 入口流量指的是进入应用的流量 (EntryType.IN), 比如 web 服务或 Dubbo 服务端接收的请求, 都属于入口流量.
我们先讲一下 SystemRuleManager 这个类在初始化的时候做了什么吧.
- SystemRuleManager
- private static SystemStatusListener statusListener = null;
- @SuppressWarnings("PMD.ThreadPoolCreationRule")
- private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
- new NamedThreadFactory("sentinel-system-status-record-task", true));
- static {
- checkSystemStatus.set(false);
- statusListener = new SystemStatusListener();
- scheduler.scheduleAtFixedRate(statusListener, 5, 1, TimeUnit.SECONDS);
- currentProperty.addListener(listener);
- }
SystemRuleManager 初始化的时候会调用静态代码块, 然后用 scheduler 线程池定时调用 SystemStatusListener 类的 run 方法. 我们进入到 SystemStatusListener 类里看一下:
- SystemStatusListener#run
- public void run() {
- try {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
- currentLoad = osBean.getSystemLoadAverage();
- currentCpuUsage = osBean.getSystemCpuLoad();
- StringBuilder sb = new StringBuilder();
- if (currentLoad> SystemRuleManager.getHighestSystemLoad()) {
- sb.append("load:").append(currentLoad).append(";");
- sb.append("cpu:").append(currentCpuUsage).append(";");
- sb.append("qps:").append(Constants.ENTRY_NODE.passQps()).append(";");
- sb.append("rt:").append(Constants.ENTRY_NODE.avgRt()).append(";");
- sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append(";");
- sb.append("success:").append(Constants.ENTRY_NODE.successQps()).append(";");
- sb.append("minRt:").append(Constants.ENTRY_NODE.minRt()).append(";");
- sb.append("maxSuccess:").append(Constants.ENTRY_NODE.maxSuccessQps()).append(";");
- RecordLog.info(sb.toString());
- }
- } catch (Throwable e) {
- RecordLog.info("could not get system error", e);
- }
- }
这个方法用来做两件事:
定时收集全局资源情况, 并打印日志
给全局变量 currentLoad 和 currentCpuUsage 赋值, 用来做限流使用.
然后看一下 SystemRuleManager.loadRules 方法. SystemRuleManager 和其他的规则管理是一样的, 当调用 loadRules 方法的时候会调用内部的 listener 并触发它的 configUpdate 方法.
在 SystemRuleManager 中实现类了一个 SystemPropertyListener, 最终 SystemRuleManager.loadRules 方法会调用到 SystemPropertyListener 的 configUpdate 中.
- SystemPropertyListener#configUpdate
- public void configUpdate(List<SystemRule> rules) {
- restoreSetting();
- // systemRules = rules;
- if (rules != null && rules.size()>= 1) {
- for (SystemRule rule : rules) {
- loadSystemConf(rule);
- }
- } else {
- checkSystemStatus.set(false);
- }
- RecordLog.info(String.format("[SystemRuleManager] Current system check status: %s,"
- + "highestSystemLoad: %e,"
- + "highestCpuUsage: %e,"
- + "maxRt: %d,"
- + "maxThread: %d,"
- + "maxQps: %e",
- checkSystemStatus.get(),
- highestSystemLoad,
- highestCpuUsage,
- maxRt,
- maxThread,
- qps));
- }
这个方法很简单, 首先是调用 restoreSetting, 用来重置 rule 的属性, 然后遍历 rule 调用 loadSystemConf 对规则进行设置:
- SystemRuleManager#loadSystemConf
- public static void loadSystemConf(SystemRule rule) {
- boolean checkStatus = false;
- // Check if it's valid.
- if (rule.getHighestSystemLoad()>= 0) {
- highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
- highestSystemLoadIsSet = true;
- checkStatus = true;
- }
- if (rule.getHighestCpuUsage()>= 0) {
- highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
- highestCpuUsageIsSet = true;
- checkStatus = true;
- }
- if (rule.getAvgRt()>= 0) {
- maxRt = Math.min(maxRt, rule.getAvgRt());
- maxRtIsSet = true;
- checkStatus = true;
- }
- if (rule.getMaxThread()>= 0) {
- maxThread = Math.min(maxThread, rule.getMaxThread());
- maxThreadIsSet = true;
- checkStatus = true;
- }
- if (rule.getQps()>= 0) {
- qps = Math.min(qps, rule.getQps());
- qpsIsSet = true;
- checkStatus = true;
- }
- checkSystemStatus.set(checkStatus);
- }
这些属性都是在限流控制中会用到的属性, 无论设置哪个属性都会设置 checkStatus=true 表示开启系统自适应限流.
在设置好限流规则后会进入到 SphU.entry 方法中, 通过创建 slot 链调用到 SystemSlot, 这里是系统自适应限流的地方.
- SystemSlot#entry
- public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
- boolean prioritized, Object... args) throws Throwable {
- // 检查一下是否符合限流条件, 符合则进行限流
- SystemRuleManager.checkSystem(resourceWrapper);
- fireEntry(context, resourceWrapper, node, count, prioritized, args);
- }
- SystemRuleManager#checkSystem
- public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
- // Ensure the checking switch is on.
- if (!checkSystemStatus.get()) {
- return;
- }
- // 如果不是入口流量, 那么直接返回
- // for inbound traffic only
- if (resourceWrapper.getType() != EntryType.IN) {
- return;
- }
- // total qps
- double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
- if (currentQps> qps) {
- throw new SystemBlockException(resourceWrapper.getName(), "qps");
- }
- // total thread
- int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
- if (currentThread> maxThread) {
- throw new SystemBlockException(resourceWrapper.getName(), "thread");
- }
- double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
- if (rt> maxRt) {
- throw new SystemBlockException(resourceWrapper.getName(), "rt");
- }
- // load. BBR algorithm.
- if (highestSystemLoadIsSet && getCurrentSystemAvgLoad()> highestSystemLoad) {
- if (!checkBbr(currentThread)) {
- throw new SystemBlockException(resourceWrapper.getName(), "load");
- }
- }
- // CPU usage
- if (highestCpuUsageIsSet && getCurrentCpuUsage()> highestCpuUsage) {
- if (!checkBbr(currentThread)) {
- throw new SystemBlockException(resourceWrapper.getName(), "cpu");
- }
- }
- }
这个方法首先会校验一下 checkSystemStatus 状态和 EntryType 是不是 IN, 如果不是则直接返回.
然后对 Constants.ENTRY_NODE 进行操作. 这个对象是一个 final static 修饰的变量, 代表是全局对象.
public final static ClusterNode ENTRY_NODE = new ClusterNode();
所以这里的限流操作都是对全局其作用的, 而不是对资源起作用. ClusterNode 还是继承自 StatisticNode, 所以最后都是调用 StatisticNode 的 successQps,curThreadNum,avgRt, 这几个方法我的前几篇文章都已经讲过了, 感兴趣的可以自己去翻一下, 这里就不过多涉及了.
在下面调用 getCurrentSystemAvgLoad 方法和 getCurrentCpuUsage 方法调用到 SystemStatusListener 设置的全局变量 currentLoad 和 currentCpuUsage. 这两个参数是 SystemRuleManager 的定时任务定时收集的, 忘了的同学回到上面讲解 SystemRuleManager 的地方看一下.
在做 load 判断和 CPU usage 判断的时候会还会调用 checkBbr 方法来判断:
- private static boolean checkBbr(int currentThread) {
- if (currentThread> 1 &&
- currentThread> Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
- return false;
- }
- return true;
- }
也就是说: 当系统 load1 超过阈值, 且系统当前的并发线程数超过系统容量时才会触发系统保护. 系统容量由系统的 maxQps * minRt 计算得出.
- StatisticNode#maxSuccessQps
- public double maxSuccessQps() {
- return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount();
- }
maxSuccessQps 方法是用窗口内的最大成功调用数和窗口数量相乘 rollingCounterInSecond 的窗口 1 秒的窗口数量是 2, 最大成功调用数如下得出:
- ArrayMetric#maxSuccess
- public long maxSuccess() {
- data.currentWindow();
- long success = 0;
- List<MetricBucket> list = data.values();
- for (MetricBucket Windows : list) {
- if (Windows.success()> success) {
- success = Windows.success();
- }
- }
- return Math.max(success, 1);
- }
最大成功调用数是通过整个遍历整个窗口, 获取所有窗口里面最大的调用数. 所以这样的最大的并发量是一个预估值, 不是真实值.
看到这里我们再来看一下 Constants.ENTRY_NODE 的信息是怎么被收集的.
我在分析 StatisticSlot 这个类的时候有一段代码我当时也没看懂有什么用, 现在就迎刃而解了:
- StatisticSlot#entry
- public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
- boolean prioritized, Object... args) throws Throwable {
- try {
- ....
- if (resourceWrapper.getType() == EntryType.IN) {
- // Add count for global inbound entry node for global statistics.
- Constants.ENTRY_NODE.increaseThreadNum();
- Constants.ENTRY_NODE.addPassRequest(count);
- }
- ....
- } catch (PriorityWaitException ex) {
- ....
- if (resourceWrapper.getType() == EntryType.IN) {
- // Add count for global inbound entry node for global statistics.
- Constants.ENTRY_NODE.increaseThreadNum();
- }
- ....
- } catch (BlockException e) {
- ....
- if (resourceWrapper.getType() == EntryType.IN) {
- // Add count for global inbound entry node for global statistics.
- Constants.ENTRY_NODE.increaseBlockQps(count);
- }
- ....
- throw e;
- } catch (Throwable e) {
- ....
- if (resourceWrapper.getType() == EntryType.IN) {
- Constants.ENTRY_NODE.increaseExceptionQps(count);
- }
- throw e;
- }
- }
在 StatisticSlot 的 entry 方法里有很多对于 type 的判断, 如果是 EntryType.IN, 那么就调用 Constants.ENTRY_NODE 的静态方法进行数据的收集.
所以看到这里我们可以知道, 在前面有很多看不懂的代码其实只要慢慢琢磨, 打个标记, 那么在后面的解析的过程中还是能够慢慢看懂的.
共勉~~
来源: https://www.cnblogs.com/luozhiyun/p/11537947.html