5.Sentinel源码分析—Sentinel如何实现自适应限流?

Sentinel源码解析系列:
1.Sentinel源码分析—FlowRuleManager加载规则做了什么?

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

3. Sentinel源码分析— QPS流量控制是如何实现的?

4.Sentinel源码分析— Sentinel是如何做到降级的?



这篇文章主要学习一下Sentinel如何实现自适应限流的。
为什么要做自适应限流,官方给了两个理由:

  1. 保证系统不被拖垮
  2. 在系统稳定的前提下,保持系统的吞吐量

我再贴一下官方的原理:

  1. 能够保证水管里的水量,能够让水顺畅的流动,则不会增加排队的请求;也就是说,这个时候的系统负载不会进一步恶化。
  2. 当保持入口的流量是水管出来的流量的最大的值的时候,可以最大利用水管的处理能力。
    更加具体的原理解释可以看官方:系统自适应限流

所以看起来好像很厉害的样子,所以我们来看看具体实现吧。

例子:

  1. 设置系统自适应规则
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
//限制最大负载
rule.setHighestSystemLoad(3.0);
// cpu负载60%
rule.setHighestCpuUsage(0.6);
// 设置平均响应时间 10 ms
rule.setAvgRt(10);
// 设置qps is 20
rule.setQps(20);
// 设置最大线程数 10
rule.setMaxThread(10);

rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
  1. 设置限流
Entry entry = null;
try {
    entry = SphU.entry("methodA", EntryType.IN);
    //dosomething
} catch (BlockException e1) {
    block.incrementAndGet();
    //dosomething
} catch (Exception e2) {
    // biz exception
} finally {
    if (entry != null) {
        entry.exit();
    }
}

注意:系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。

我们先讲一下SystemRuleManager这个类在初始化的时候做了什么吧。

SystemRuleManager

private static SystemStatusListener statusListener = null;
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
    new NamedThreadFactory("sentinel-system-status-record-task", true));

static {
    checkSystemStatus.set(false);
    statusListener = new SystemStatusListener();
    scheduler.scheduleAtFixedRate(statusListener, 5, 1, TimeUnit.SECONDS);
    currentProperty.addListener(listener);
}

SystemRuleManager初始化的时候会调用静态代码块,然后用scheduler线程池定时调用SystemStatusListener类的run方法。我们进入到SystemStatusListener类里看一下:

SystemStatusListener#run

public void run() {
    try {
        OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
        currentLoad = osBean.getSystemLoadAverage();

        currentCpuUsage = osBean.getSystemCpuLoad();

        StringBuilder sb = new StringBuilder();
        if (currentLoad > SystemRuleManager.getHighestSystemLoad()) {
            sb.append("load:").append(currentLoad).append(";");
            sb.append("cpu:").append(currentCpuUsage).append(";");
            sb.append("qps:").append(Constants.ENTRY_NODE.passQps()).append(";");
            sb.append("rt:").append(Constants.ENTRY_NODE.avgRt()).append(";");
            sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append(";");
            sb.append("success:").append(Constants.ENTRY_NODE.successQps()).append(";");
            sb.append("minRt:").append(Constants.ENTRY_NODE.minRt()).append(";");
            sb.append("maxSuccess:").append(Constants.ENTRY_NODE.maxSuccessQps()).append(";");
            RecordLog.info(sb.toString());
        }

    } catch (Throwable e) {
        RecordLog.info("could not get system error ", e);
    }
}

这个方法用来做两件事:

  1. 定时收集全局资源情况,并打印日志
  2. 给全局变量currentLoad和currentCpuUsage赋值,用来做限流使用。

然后看一下SystemRuleManager.loadRules方法。SystemRuleManager和其他的规则管理是一样的,当调用loadRules方法的时候会调用内部的listener并触发它的configUpdate方法。
在SystemRuleManager中实现类了一个SystemPropertyListener,最终SystemRuleManager.loadRules方法会调用到SystemPropertyListener的configUpdate中。

SystemPropertyListener#configUpdate

public void configUpdate(List<SystemRule> rules) {
    restoreSetting();
    // systemRules = rules;
    if (rules != null && rules.size() >= 1) {
        for (SystemRule rule : rules) {
            loadSystemConf(rule);
        }
    } else {
        checkSystemStatus.set(false);
    }

    RecordLog.info(String.format("[SystemRuleManager] Current system check status: %s, "
            + "highestSystemLoad: %e, "
            + "highestCpuUsage: %e, "
            + "maxRt: %d, "
            + "maxThread: %d, "
            + "maxQps: %e",
        checkSystemStatus.get(),
        highestSystemLoad,
        highestCpuUsage,
        maxRt,
        maxThread,
        qps));
}

这个方法很简单,首先是调用restoreSetting,用来重置rule的属性,然后遍历rule调用loadSystemConf对规则进行设置:

SystemRuleManager#loadSystemConf

public static void loadSystemConf(SystemRule rule) {
    boolean checkStatus = false;
    // Check if it's valid.

    if (rule.getHighestSystemLoad() >= 0) {
        highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
        highestSystemLoadIsSet = true;
        checkStatus = true;
    }

    if (rule.getHighestCpuUsage() >= 0) {
        highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
        highestCpuUsageIsSet = true;
        checkStatus = true;
    }

    if (rule.getAvgRt() >= 0) {
        maxRt = Math.min(maxRt, rule.getAvgRt());
        maxRtIsSet = true;
        checkStatus = true;
    }
    if (rule.getMaxThread() >= 0) {
        maxThread = Math.min(maxThread, rule.getMaxThread());
        maxThreadIsSet = true;
        checkStatus = true;
    }

    if (rule.getQps() >= 0) {
        qps = Math.min(qps, rule.getQps());
        qpsIsSet = true;
        checkStatus = true;
    }

    checkSystemStatus.set(checkStatus);

}

这些属性都是在限流控制中会用到的属性,无论设置哪个属性都会设置checkStatus=true表示开启系统自适应限流。

在设置好限流规则后会进入到SphU.entry方法中,通过创建slot链调用到SystemSlot,这里是系统自适应限流的地方。

SystemSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
      //检查一下是否符合限流条件,符合则进行限流
    SystemRuleManager.checkSystem(resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

SystemRuleManager#checkSystem

public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
    // Ensure the checking switch is on.
    if (!checkSystemStatus.get()) {
        return;
    }
    //如果不是入口流量,那么直接返回
    // for inbound traffic only
    if (resourceWrapper.getType() != EntryType.IN) {
        return;
    }

    // total qps
    double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
    if (currentQps > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }

    // total thread
    int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
    if (currentThread > maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }

    double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }

    // load. BBR algorithm.
    if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }

    // cpu usage
    if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }
}

这个方法首先会校验一下checkSystemStatus状态和EntryType是不是IN,如果不是则直接返回。
然后对Constants.ENTRY_NODE进行操作。这个对象是一个final static 修饰的变量,代表是全局对象。

public final static ClusterNode ENTRY_NODE = new ClusterNode();

所以这里的限流操作都是对全局其作用的,而不是对资源起作用。ClusterNode还是继承自StatisticNode,所以最后都是调用StatisticNode的successQps、curThreadNum、avgRt,这几个方法我的前几篇文章都已经讲过了,感兴趣的可以自己去翻一下,这里就不过多涉及了。

在下面调用getCurrentSystemAvgLoad方法和getCurrentCpuUsage方法调用到SystemStatusListener设置的全局变量currentLoad和currentCpuUsage。这两个参数是SystemRuleManager的定时任务定时收集的,忘了的同学回到上面讲解SystemRuleManager的地方看一下。

在做load判断和cpu usage判断的时候会还会调用checkBbr方法来判断:

private static boolean checkBbr(int currentThread) {
    if (currentThread > 1 &&
        currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
        return false;
    }
    return true;
}

也就是说:当系统 load1 超过阈值,且系统当前的并发线程数超过系统容量时才会触发系统保护。系统容量由系统的 maxQps * minRt 计算得出。

StatisticNode#maxSuccessQps

public double maxSuccessQps() {
    return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount();
}

maxSuccessQps方法是用窗口内的最大成功调用数和窗口数量相乘rollingCounterInSecond的窗口1秒的窗口数量是2,最大成功调用数如下得出:
ArrayMetric#maxSuccess

public long maxSuccess() {
    data.currentWindow();
    long success = 0;

    List<MetricBucket> list = data.values();
    for (MetricBucket window : list) {
        if (window.success() > success) {
            success = window.success();
        }
    }
    return Math.max(success, 1);
}

最大成功调用数是通过整个遍历整个窗口,获取所有窗口里面最大的调用数。所以这样的最大的并发量是一个预估值,不是真实值。

看到这里我们再来看一下Constants.ENTRY_NODE的信息是怎么被收集的。
我在分析StatisticSlot这个类的时候有一段代码我当时也没看懂有什么用,现在就迎刃而解了:
StatisticSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
            ....
        if (resourceWrapper.getType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }
            ....
    } catch (PriorityWaitException ex) {
            ....
        if (resourceWrapper.getType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
        }
         ....
    } catch (BlockException e) {
            ....
        if (resourceWrapper.getType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }
            ....
        throw e;
    } catch (Throwable e) {
         ....
        if (resourceWrapper.getType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseExceptionQps(count);
        }
        throw e;
    }
}

在StatisticSlot的entry方法里有很多对于type的判断,如果是EntryType.IN,那么就调用Constants.ENTRY_NODE的静态方法进行数据的收集。

所以看到这里我们可以知道,在前面有很多看不懂的代码其实只要慢慢琢磨,打个标记,那么在后面的解析的过程中还是能够慢慢看懂的。

共勉~~

原文地址:https://www.cnblogs.com/luozhiyun/p/11537947.html

时间: 2024-08-27 21:21:42

5.Sentinel源码分析—Sentinel如何实现自适应限流?的相关文章

4.Sentinel源码分析— Sentinel是如何做到降级的?

各位中秋节快乐啊,我觉得在这个月圆之夜有必要写一篇源码解析,以表示我内心的高兴~ Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 在我的第二篇文章里面2. Sentinel源码分析-Sentinel是如何进行流量统计的?里面介绍了整个Sentinel的主流程是怎样的.所以降级的大致流程可以概述为:

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 5.Sentinel源码分析-Sentinel如何实现自适应限流? 有时候我们做限流的时候并不想直接写死在代码里面,然后每次要改规则,或者增加规则的时候只能去重启应用来解决.而是希望能

7.Sentinel源码分析—Sentinel是怎么和控制台通信的?

这里会介绍: Sentinel会使用多线程的方式实现一个类Reactor的IO模型 Sentinel会使用心跳检测来观察控制台是否正常 Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 5.Sentinel源码分析-Sentinel如

源码分析 Sentinel 之 Dubbo 适配原理

在Alibaba Sentinel 限流与熔断初探(技巧篇) 的示例中我选择了 sentinel-demo-apache-dubbo 作为突破点,故本文就从该项目入手,看看 Sentinel 是如何对 Dubbo 做的适配,让项目使用方无感知,只需要引入对应的依即可. sentinel-apache-dubbo-adapter 比较简单,展开如下: 上面的代码应该比较简单,在正式进入源码研究之前,我先抛出如下二个问题: 1.限流.熔断相关的功能是在 Dubbo 的客户端实现还是服务端实现?为什么

Nginx源码分析 - Nginx启动以及IOCP模型

Nginx 源码分析 - Nginx启动以及IOCP模型 版本及平台信息 本文档针对Nginx1.11.7版本,分析Windows下的相关代码,虽然服务器可能用linux更多,但是windows平台下的代码也基本相似 ,另外windows的IOCP完成端口,异步IO模型非常优秀,很值得一看. Nginx启动 曾经有朋友问我,面对一个大项目的源代码,应该从何读起呢?我给他举了一个例子,我们学校大一大二是在紫金港校区,到了 大三搬到玉泉校区,但是大一的时候也会有时候有事情要去玉泉办.偶尔会去玉泉,但

【JUC】JDK1.8源码分析之SynchronousQueue(九)

一.前言 本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后理解线程池有很有好处,SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然.同步队列没有任何内部容量,甚至连一个队列的容量都没有. 二.SynchronousQueue数据结构 由于SynchronousQueue的支持公平策略和非公平策略,所

jQuery 源码分析(十一) 队列模块 Queue详解

队列是常用的数据结构之一,只允许在表的前端(队头)进行删除操作(出队),在表的后端(队尾)进行插入操作(入队).特点是先进先出,最先插入的元素最先被删除. 在jQuery内部,队列模块为动画模块提供基础功能,负责存储动画函数.自动出队并执行动画函数,同时还要确保动画函数的顺序执行. jQuery的静态方法含有如下API: $.queue(elem,type,data) ;返回或修改匹配元素关联的队列,返回最新的队列,参数如下:   elem ;DOM元素或JavaScript对象 type  ;

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A