3.Hystrix源码-熔断策略

前言

上一篇讲解了hystrix四个接口的关系和调用流程,这一节主要讲解一下在住流程中熔断策略是怎样的,是如何判断应用是否该进行熔断的

强制开关

在流量进来的时候,会经过这个方法来判断现在的熔断开关状态,如果为true则允许流量通过,如果为false则进入fallback阶段

@Override

public boolean allowRequest() {

    if (properties.circuitBreakerForceOpen().get()) {

        // 强制打开熔断开关,所有请求都走fallback逻辑

        return false;

    }

    if (properties.circuitBreakerForceClosed().get()) {

        // we still want to allow isOpen() to perform it‘s calculations so we simulate normal behavior

        isOpen();

        // 强制关闭熔断开关,所有都走正常流程

        return true;

    }

     //如果没有设置强制关闭和打开,走正常的熔断策略判断,如果满足流量探测的情况可以做一次流量放行

    return !isOpen() || allowSingleTest();

}

正常策略

在没有强制开关的前提下,会进入正常的熔断开关判断流程

@Override

public boolean isOpen() {

    if (circuitOpen.get()) {

        // 如果开关已经被打开,返回true

        return true;

    }

    // 得到时间窗口内的健康统计

    HealthCounts health = metrics.getHealthCounts();

    // 如果在时间窗口之内的访问数小于阈值,直接返回为关闭状态,流量都走正常逻辑,默认值是10s内20个请求数

    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

        // we are not past the minimum volume threshold for the statisticalWindow so we‘ll return false immediately and not calculate anything

        return false;

    }

    

    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

        return false;

    else {

        // //如果在时间窗口内超过了RequestVolumeThreshold的阈值,并且错误率达到了上限,打开熔断开关,并设置打开的时间,方便后续健康探测(allowSingleTest)

        if (circuitOpen.compareAndSet(falsetrue)) {

            // if the previousValue was false then we want to set the currentTime

            // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have

            // caused another thread to set it to true already even though we were in the process of doing the same

            // circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());

            return true;

        else {

            return false;

        }

    }

}

流量探测

如果熔断开关打开的时间已经超过了睡眠时间大小,那么应该进行一次流量探测,探测后端服务是否已经恢复

//简单的说就是当当前时间已经超过了上一次熔断打开的时间加上睡眠时间,那么就进行一次流量放行

public boolean allowSingleTest() {

    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();

    // 1) if the circuit is open

    // 2) and it‘s been longer than ‘sleepWindow‘ since we opened the circuit

    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {

        // We push the ‘circuitOpenedTime‘ ahead by ‘sleepWindow‘ since we have allowed one request to try.

        // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the ‘sleepWindow‘.

        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {

            // if this returns true that means we set the time so we‘ll return true to allow the singleTest

            // if it returned false it means another thread raced us and allowed the singleTest before we did

            return true;

        }

    }

    return false;

}

//流量探测成功后会在run方法执行完后,关闭熔断开关,详见executeCommand()方法

public void markSuccess() {

    if (circuitOpen.get()) {

        // TODO how can we can do this without resetting the counts so we don‘t lose metrics of short-circuits etc?

        metrics.resetCounter();

        // If we have been ‘open‘ and have a success then we want to close the circuit. This handles the ‘singleTest‘ logic

        circuitOpen.set(false);

    }

}

小结

1.可以设置强制关闭,和强制打开两个熔断开关,并且强制打开开关优先

2.强制关闭开关不影响熔断开关的开闭操作,即,就算是强制关闭开关,熔断开关也会正常的因为错误率和阈值等正常开闭,主要是不影响统计上报的数据

3.默认是错误超过50%且10秒内超过20个请求会进行熔断,具体统计见下小节

4.当熔断开关打开之后,在睡眠时间达到之后,会允许探测流量通过,在run方法执行成功之后会关闭熔断,然后正常服务

请求统计方式

metrics.rollingStats.timeInMilliseconds设置时间窗口大小,默认是10s

metrics.rollingStats.numBuckets设置窗口的桶的数量,默认是10个,且时间大小必须能够被桶数量整除

public HealthCounts getHealthCounts() {

    // we put an interval between snapshots so high-volume commands don‘t

    // spend too much unnecessary time calculating metrics in very small time periods

    long lastTime = lastHealthCountsSnapshot.get();

    long currentTime = System.currentTimeMillis();

    if (currentTime - lastTime >= properties.metricsHealthSnapshotIntervalInMilliseconds().get() || healthCountsSnapshot == null) {

        if (lastHealthCountsSnapshot.compareAndSet(lastTime, currentTime)) {

            // our thread won setting the snapshot time so we will proceed with generating a new snapshot

            // losing threads will continue using the old snapshot

            long success = counter.getRollingSum(HystrixRollingNumberEvent.SUCCESS);//得到时间窗口中成功的总数

            long failure = counter.getRollingSum(HystrixRollingNumberEvent.FAILURE); // fallbacks occur on this

            long timeout = counter.getRollingSum(HystrixRollingNumberEvent.TIMEOUT); // fallbacks occur on this

            long threadPoolRejected = counter.getRollingSum(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); // fallbacks occur on this

            long semaphoreRejected = counter.getRollingSum(HystrixRollingNumberEvent.SEMAPHORE_REJECTED); // fallbacks occur on this

            long shortCircuited = counter.getRollingSum(HystrixRollingNumberEvent.SHORT_CIRCUITED); // fallbacks occur on this

            long totalCount = failure + success + timeout + threadPoolRejected + shortCircuited + semaphoreRejected;

            long errorCount = failure + timeout + threadPoolRejected + shortCircuited + semaphoreRejected;

            int errorPercentage = 0;

            if (totalCount > 0) {

                errorPercentage = (int) ((double) errorCount / totalCount * 100);

            }

            healthCountsSnapshot = new HealthCounts(totalCount, errorCount, errorPercentage);

        }

    }

    return healthCountsSnapshot;

}

上面代码可以看到,当在判断熔断状态时得到一个HealthCount对象,这个对象中包含success error errorpercent。都是在目前时间窗口中算出来的。从上面的代码可以看到,fallback在哪些地方会执行,分别是:

  1. run方法抛出异常,执行失败
  2. run方法超时
  3. 线程池和semaphore两种方式请求数已满导致的拒绝
  4. 熔断开关打开

使用方可以自定义参数来调整时间和桶的大小来适配自己的业务需求,下面这张图简单的说明了熔断的流程

原文地址:https://www.cnblogs.com/200911/p/8416886.html

时间: 2024-10-29 00:15:59

3.Hystrix源码-熔断策略的相关文章

Hystrix源码解析

1. Hystrix源码解析 1.1. @HystrixCommand原理 直接通过Aspect切面来做的 1.2. feign hystrix原理 它的本质原理就是对HystrixCommand的动态代理封装 1.2.1. 如何启动Feign hystrix的? 在openFeign里,已经封装了Hystrix,当feign.hystrix.enabled为true时生效 Github地址:https://github.com/tzxylao/learn-demo 原文地址:https://w

Hystrix源码

HystrixInvocationHandler.invoke()--->HystrixCommand.execute()--->queue()--->toObservable().toBlocking.toFuture()--->toFuture方法中that.single().subscribe()订阅subscriber 而生成Observable的逻辑是:toObservable--->applyHystrixSemantics(cmd)--->executeC

hystrix源码小贴士之调用异常处理

executeCommandAndObserve方法处理onerror异常. return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); handleFallback方法处理执行过程中的各种异常 final Func1<Throwable, Observable<R>> han

hystrix源码小贴士之中断

execution.isolation.thread.interruptOnCancel可以设置当cancellation发生时是否需要中断.通过Future的cancel方法和线程的中断方法来实现是否需要中断. public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the executi

hystrix源码小贴士之Yammer Publisher

HystrixYammerMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixYammerMetricsPublisherCommand.HystrixYammerMetricsPublisherThreadPool.HystrixYammerMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForComma

hystrix源码小贴士之Servo Publisher

HystrixServoMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixServoMetricsPublisherCommand.HystrixServoMetricsPublisherThreadPool.HystrixServoMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(H

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

Solr4.8.0源码分析(24)之SolrCloud的Recovery策略(五)

Solr4.8.0源码分析(24)之SolrCloud的Recovery策略(五) 题记:关于SolrCloud的Recovery策略已经写了四篇了,这篇应该是系统介绍Recovery策略的最后一篇了.本文主要介绍Solr的主从同步复制.它与前文<Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)>略有不同,前文讲到的是SolrCloud的leader与replica之间的同步,不需要通过配置solrconfig.xml来实现.而本文主要介绍单机模式下,利用so

Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四)

Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四) 题记:本来计划的SolrCloud的Recovery策略的文章是3篇的,但是没想到Recovery的内容蛮多的,前面三章分别介绍了Recovery的原理和总体流程,PeerSync策略,Replication策略.本章主要介绍我在实际生产环境中碰到的recovery的几个问题,以及前面漏下的几个点. 一. 日志中多次出现"Stopping recovery for zkNodeName= ..." 我在