4.Sentinel源码分析— Sentinel是如何做到降级的?

各位中秋节快乐啊,我觉得在这个月圆之夜有必要写一篇源码解析,以表示我内心的高兴~

Sentinel源码解析系列:

1.Sentinel源码分析—FlowRuleManager加载规则做了什么?

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

3. Sentinel源码分析— QPS流量控制是如何实现的?



在我的第二篇文章里面2. Sentinel源码分析—Sentinel是如何进行流量统计的?里面介绍了整个Sentinel的主流程是怎样的。所以降级的大致流程可以概述为:
1. 设置降级策略,是根据平均响应时间还是异常比例来进行降级的
2. 根据资源创建一系列的插槽
3. 依次调用插槽,根据设定的插槽类型来进行降级

我们先来看个例子,方便大家自己断点跟踪:

private static final String KEY = "abc";
private static final int threadCount = 100;
private static int seconds = 60 + 40;

public static void main(String[] args) throws Exception {

        List<DegradeRule> rules = new ArrayList<DegradeRule>();
        DegradeRule rule = new DegradeRule();
        rule.setResource(KEY);
        // set threshold rt, 10 ms
        rule.setCount(10);
        rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
        rule.setTimeWindow(10);
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);

    for (int i = 0; i < threadCount; i++) {
        Thread entryThread = new Thread(new Runnable() {

            @Override
            public void run() {
                while (true) {
                    Entry entry = null;
                    try {
                        TimeUnit.MILLISECONDS.sleep(5);
                        entry = SphU.entry(KEY);
                        // token acquired
                        pass.incrementAndGet();
                        // sleep 600 ms, as rt
                        TimeUnit.MILLISECONDS.sleep(600);
                    } catch (Exception e) {
                        block.incrementAndGet();
                    } finally {
                        total.incrementAndGet();
                        if (entry != null) {
                            entry.exit();
                        }
                    }
                }
            }
        });
        entryThread.setName("working-thread");
        entryThread.start();
    }
}

其他的流程基本上和第二篇文章里介绍的差不多,这篇文章来介绍Sentinel的主流程,Sentinel的降级策略全部都是在DegradeSlot中进行操作的。

DegradeSlot

public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

DegradeSlot会直接调用DegradeRuleManager进行降级的操作,我们直接进入到DegradeRuleManager.checkDegrade方法中。

DegradeRuleManager#checkDegrade

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
    throws BlockException {
    //根据resource来获取降级策略
    Set<DegradeRule> rules = degradeRules.get(resource.getName());
    if (rules == null) {
        return;
    }

    for (DegradeRule rule : rules) {
        if (!rule.passCheck(context, node, count)) {
            throw new DegradeException(rule.getLimitApp(), rule);
        }
    }
}

这个方法逻辑也是非常的清晰,首先是根据资源名获取到注册过的降级规则,然后遍历规则集合调用规则的passCheck,如果返回false那么就抛出异常进行降级。

DegradeRule#passCheck

public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
    //返回false直接进行降级
    if (cut.get()) {
        return false;
    }
    //降级是根据资源的全局节点来进行判断降级策略的
    ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
    if (clusterNode == null) {
        return true;
    }
    //根据响应时间降级策略
    if (grade == RuleConstant.DEGRADE_GRADE_RT) {
        //获取节点的平均响应时间
        double rt = clusterNode.avgRt();
        if (rt < this.count) {
            passCount.set(0);
            return true;
        }
        //rtSlowRequestAmount默认是5
        // Sentinel will degrade the service only if count exceeds.
        if (passCount.incrementAndGet() < rtSlowRequestAmount) {
            return true;
        }
        //    根据异常比例降级
    } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
        double exception = clusterNode.exceptionQps();
        double success = clusterNode.successQps();
        double total = clusterNode.totalQps();
        // If total amount is less than minRequestAmount, the request will pass.
        if (total < minRequestAmount) {
            return true;
        }

        // In the same aligned statistic time window,
        // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
        double realSuccess = success - exception;
        if (realSuccess <= 0 && exception < minRequestAmount) {
            return true;
        }

        if (exception / success < count) {
            return true;
        }
        //    根据异常数降级
    } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
        double exception = clusterNode.totalException();
        if (exception < count) {
            return true;
        }
    }
    //根据设置的时间窗口进行重置
    if (cut.compareAndSet(false, true)) {
        ResetTask resetTask = new ResetTask(this);
        pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
    }

    return false;
}

这个方法首先会去获取cut的值,如果是true那么就直接进行限流操作。然后会根据resource获取ClusterNode全局节点。往下分别根据三种不同的策略来进行降级。

DEGRADE_GRADE_RT根据响应时间进行降级

if (grade == RuleConstant.DEGRADE_GRADE_RT) {
    //获取节点的平均响应时间
    double rt = clusterNode.avgRt();
    if (rt < this.count) {
        passCount.set(0);
        return true;
    }
    //rtSlowRequestAmount默认是5
    // Sentinel will degrade the service only if count exceeds.
    if (passCount.incrementAndGet() < rtSlowRequestAmount) {
        return true;
    }
}

如果是根据响应时间进行降级,那么会获取clusterNode的平均响应时间,如果平均响应时间大于所设定的count(默认是毫秒),那么就调用passCount加1,如果passCount大于5,那么直接降级。

所以看到这里我们应该知道根据平均响应时间降级前几个请求即使响应过长也不会立马降级,而是要等到第六个请求到来才会进行降级。

我们进入到clusterNode的avgRt方法中看一下是如何获取到clusterNode的平均响应时间的。

clusterNode是StatisticNode的实例
StatisticNode#avgRt
java public double avgRt() { //获取当前时间窗口内调用成功的次数 long successCount = rollingCounterInSecond.success(); if (successCount == 0) { return 0; } //获取窗口内的响应时间 return rollingCounterInSecond.rt() * 1.0 / successCount; }e

这个方法主要是调用rollingCounterInSecond获取成功次数,然后再获取窗口内的响应时间,用总响应时间除以次数得到平均每次成功调用的响应时间。

1.Sentinel源码分析—FlowRuleManager加载规则做了什么?中,我已经具体讲述了StatisticNode里面的rollingCounterInMinute实现原理,rollingCounterInMinute是按分钟进行统计的时间窗口。现在我们来讲一下rollingCounterInSecond按秒来进行统计的时间窗口。

在StatisticNode里面初始化rollingCounterInSecond:

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

在这个初始化的方法里,会传入两个参数,SampleCountProperty.SAMPLE_COUNT的值是2,
IntervalProperty.INTERVAL的值是1000。

我们进入到ArrayMetric的构造方法中:

private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

在创建ArrayMetric实例的时候会给data创建一个OccupiableBucketLeapArray实例。

OccupiableBucketLeapArray

public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
    // This class is the original "CombinedBucketArray".
    super(sampleCount, intervalInMs);
    this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}

OccupiableBucketLeapArray继承LeapArray这个抽象类,初始化的时候会调用父类的构造器:
LeapArray

public LeapArray(int sampleCount, int intervalInMs) {
    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    //intervalInMs是sampleCount的整数
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
    //每个小窗口的时间跨度
    this.windowLengthInMs = intervalInMs / sampleCount;
    //窗口的长度
    this.intervalInMs = intervalInMs;
    //窗口个数
    this.sampleCount = sampleCount;

    this.array = new AtomicReferenceArray<>(sampleCount);
}

OccupiableBucketLeapArray在初始化的时候也会创建一个FutureBucketLeapArray实例赋值给borrowArray。

FutureBucketLeapArray也是继承LeapArray:

public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
    // This class is the original "BorrowBucketArray".
    super(sampleCount, intervalInMs);
}

直接通过调用父类LeapArray的构造方法进行初始化。

到这里rollingCounterInSecond的创建过程讲完了。

下面我们再回到StatisticNode中,在调用StatisticNode的avgRt方法的时候会调用rollingCounterInSecond.success()方法获取当前时间窗口的调用成功次数:

ArrayMetric#success

public long success() {
    //设置或更新当前的时间窗口
    data.currentWindow();
    long success = 0;
    //获取窗口里有效的Bucket
    List<MetricBucket> list = data.values();
    for (MetricBucket window : list) {
        success += window.success();
    }
    return success;
}

这里的data是的父类是LeapArray,LeapArray里面有一个array数组,用来记录时间窗口,在我们这里是基于秒钟的时间窗口,所以array的大小为2。data的结构图我直接从1.Sentinel源码分析—FlowRuleManager加载规则做了什么?中拿过来:

只不过这里的WindowWrap数组元素只有两个,每一个WindowWrap元素由MetricBucket对象构成,用来统计数据,如:通过次数、阻塞次数、异常次数等~

调用data的currentWindow方法会调用到LeapArray的currentWindow方法中去:
LeapArray#currentWindow

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
    //通过当前时间判断属于哪个窗口
    int idx = calculateTimeIdx(timeMillis);
    //计算出窗口开始时间
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);

    while (true) {
        //获取数组里的老数据
        WindowWrap<T> old = array.get(idx);
        if (old == null) {

            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
            // 如果对应时间窗口的开始时间与计算得到的开始时间一样
            // 那么代表当前即是我们要找的窗口对象,直接返回
        } else if (windowStart == old.windowStart()) {

            return old;
        } else if (windowStart > old.windowStart()) {
            //如果当前的开始时间小于原开始时间,那么就更新到新的开始时间
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            //一般来说不会走到这里
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

这里我简单介绍一下这个方法,这个方法的详细讲解已经在第一篇源码分析里做了。

这个方法里面会根据当前的时间戳来计算出array数组里面的index,然后去array数组中找相应的数据,如果节点已经存在,那么用CAS更新一个新的节点;如果节点是新的,那么直接返回;如果节点失效了,设置当前节点,清除所有失效节点。

这里我直接引用1.Sentinel源码分析—FlowRuleManager加载规则做了什么?中的例子:

1. 如果array数据里面的bucket数据如下所示:
  NULL      B4
|_______|_______|
800     1000    1200
    ^
   time=888
正好当前时间所对应的槽位里面的数据是空的,那么就用CAS更新

2. 如果array里面已经有数据了,并且槽位里面的窗口开始时间和当前的开始时间相等,那么直接返回
      B3      B4
 ||_______|_______||___
800     1000    1200  timestamp
      ^
    time=888

3. 例如当前时间是1676,所对应窗口里面的数据的窗口开始时间小于当前的窗口开始时间,那么加上锁,然后设置槽位的窗口开始时间为当前窗口开始时间,并把槽位里面的数据重置
   (old)
             B0
 |_______||_______|
 ...    1200     1400
    ^
  time=1676

再回到ArrayMetric的success方法中,往下走调用data.values()方法:
LeapArray#success

public List<T> values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList<T>();
    }
    int size = array.length();
    List<T> result = new ArrayList<T>(size);

    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }
        result.add(windowWrap.value());
    }
    return result;
}

这个方法就是用来获取所有有效的MetricBucket,并返回。
然后通过调用MetricBucket的success方法获取被成功调用的次数。

我们接着来看ArrayMetric的rt方法:

public long rt() {
    data.currentWindow();
    long rt = 0;
    //获取当前时间窗口的统计数据
    List<MetricBucket> list = data.values();
    //统计当前时间窗口的平均相应时间之和
    for (MetricBucket window : list) {
        rt += window.rt();
    }
    return rt;
}

这个方法和上面的success方法差不多,获取所有的MetricBucket的rt数据求和返回。
然后就可以通过rt方法返回的时间总和除以成功调用的总和求得平均数。

我们再回到DegradeRule的passCheck方法中的响应时间降级策略中:

if (grade == RuleConstant.DEGRADE_GRADE_RT) {
    //获取节点的平均响应时间
    double rt = clusterNode.avgRt();
    if (rt < this.count) {
        passCount.set(0);
        return true;
    }
    //rtSlowRequestAmount默认是5
    // Sentinel will degrade the service only if count exceeds.
    if (passCount.incrementAndGet() < rtSlowRequestAmount) {
        return true;
    }
    //    根据异常比例降级
}
//省略
return false;

如果求得的平均响应时间小于设置的count时间,那么就重置passCount并返回true,表示不抛出异常;如果有连续5次的响应时间都超过了count,那么就返回false抛出异常进行降级。

DEGRADE_GRADE_EXCEPTION_RATIO根据异常比例降级

if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
    //获取每秒异常的次数
    double exception = clusterNode.exceptionQps();
    //获取每秒成功的次数
    double success = clusterNode.successQps();
    //获取每秒总调用次数
    double total = clusterNode.totalQps();
    // If total amount is less than minRequestAmount, the request will pass.
    // 如果总调用次数少于5,那么不进行降级
    if (total < minRequestAmount) {
        return true;
    }

    // In the same aligned statistic time window,
    // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
    double realSuccess = success - exception;
    if (realSuccess <= 0 && exception < minRequestAmount) {
        return true;
    }

    if (exception / success < count) {
        return true;
    }
}
。。。
return false;

这个方法中获取成功调用的Qps和异常调用的Qps,验证后,然后求一下比率,如果没有大于count,那么就返回true,否则返回false抛出异常。

我们再进入到exceptionQps方法中看一下:
StatisticNode#exceptionQps

public double exceptionQps() {
    return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec();
}

rollingCounterInSecond.getWindowIntervalInSec方法是表示窗口的时间长度,用秒来表示。这里返回的是1。
ArrayMetric#exception

public long exception() {
    data.currentWindow();
    long exception = 0;
    List<MetricBucket> list = data.values();
    for (MetricBucket window : list) {
        exception += window.exception();
    }
    return exception;
}

这个方法和我上面分析的差不多,大家看看就好了。

根据异常数降级DEGRADE_GRADE_EXCEPTION_COUNT

if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
    double exception = clusterNode.totalException();
    if (exception < count) {
        return true;
    }
}

根据异常数降级是非常的直接的,直接根据统计的异常总次数判断是否超过count。

到这里就讲完了降级的实现咯~~

原文地址:https://www.cnblogs.com/luozhiyun/p/11517918.html

时间: 2024-08-29 21:27:53

4.Sentinel源码分析— Sentinel是如何做到降级的?的相关文章

5.Sentinel源码分析—Sentinel如何实现自适应限流?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 这篇文章主要学习一下Sentinel如何实现自适应限流的. 为什么要做自适应限流,官方给了两个理由: 保证系统不被拖垮 在系统稳定的前提下,保持系统的吞吐量 我再贴一下官方的原理: 能

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 5.Sentinel源码分析-Sentinel如何实现自适应限流? 有时候我们做限流的时候并不想直接写死在代码里面,然后每次要改规则,或者增加规则的时候只能去重启应用来解决.而是希望能

7.Sentinel源码分析—Sentinel是怎么和控制台通信的?

这里会介绍: Sentinel会使用多线程的方式实现一个类Reactor的IO模型 Sentinel会使用心跳检测来观察控制台是否正常 Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 5.Sentinel源码分析-Sentinel如

源码分析 Sentinel 之 Dubbo 适配原理

在Alibaba Sentinel 限流与熔断初探(技巧篇) 的示例中我选择了 sentinel-demo-apache-dubbo 作为突破点,故本文就从该项目入手,看看 Sentinel 是如何对 Dubbo 做的适配,让项目使用方无感知,只需要引入对应的依即可. sentinel-apache-dubbo-adapter 比较简单,展开如下: 上面的代码应该比较简单,在正式进入源码研究之前,我先抛出如下二个问题: 1.限流.熔断相关的功能是在 Dubbo 的客户端实现还是服务端实现?为什么

Nginx源码分析 - Nginx启动以及IOCP模型

Nginx 源码分析 - Nginx启动以及IOCP模型 版本及平台信息 本文档针对Nginx1.11.7版本,分析Windows下的相关代码,虽然服务器可能用linux更多,但是windows平台下的代码也基本相似 ,另外windows的IOCP完成端口,异步IO模型非常优秀,很值得一看. Nginx启动 曾经有朋友问我,面对一个大项目的源代码,应该从何读起呢?我给他举了一个例子,我们学校大一大二是在紫金港校区,到了 大三搬到玉泉校区,但是大一的时候也会有时候有事情要去玉泉办.偶尔会去玉泉,但

【JUC】JDK1.8源码分析之SynchronousQueue(九)

一.前言 本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后理解线程池有很有好处,SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然.同步队列没有任何内部容量,甚至连一个队列的容量都没有. 二.SynchronousQueue数据结构 由于SynchronousQueue的支持公平策略和非公平策略,所

jQuery 源码分析(十一) 队列模块 Queue详解

队列是常用的数据结构之一,只允许在表的前端(队头)进行删除操作(出队),在表的后端(队尾)进行插入操作(入队).特点是先进先出,最先插入的元素最先被删除. 在jQuery内部,队列模块为动画模块提供基础功能,负责存储动画函数.自动出队并执行动画函数,同时还要确保动画函数的顺序执行. jQuery的静态方法含有如下API: $.queue(elem,type,data) ;返回或修改匹配元素关联的队列,返回最新的队列,参数如下:   elem ;DOM元素或JavaScript对象 type  ;

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A