聊聊微服务熔断降级Hystrix

  在现在的微服务使用的过程中,经常会遇到依赖的服务不可用,那么如果依赖的服务不可用的话,会导致把自己的服务也会拖死,那么就产生了熔断,熔断顾名思义就是当服务处于不可用的时候采取半开关的状态,达到一定数量后就熔断器就打开。这就相当于家里边的保险丝,如果电压过高的话,保险丝就会断掉,起到保护电器的作用。

  目前支持熔断,降级的就是Hystrix,当然还有resilience4j还有Sentinel。今天咱们以Hystrix为主吧。其他的大家可以自行研究。

  Hystrix主要实现三个功能,接下来咱们继续展开。

  1. 资源隔离

  2. 熔断

  3. 降级

  资源隔离分为两种,一种是线程池隔离,一种是信号量semaphore隔离。线程池以请求的线程和执行的线程分为不同的线程执行,信号量是请求和执行采用相同的线程。

  当然,涉及到线程池的话,那么就支持异步,支持异步Future的话也就支持get的时候支持超时获取。信号量这些功能不支持,但是信号量是支持熔断,限流。他们的区别如下:

  线程切换 异步 超时 熔断 限流 资源开销
线程池
信号量

  HystrixCommand的命令执行大致如下图:

  依赖的pom如下:

        <!-- 依赖版本 -->
        <hystrix.version>1.3.16</hystrix.version>
        <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>

        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>${hystrix.version}</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>${hystrix-metrics-event-stream.version}</version>
        </dependency>

  支持同步,异步,观察事件拦截,以及订阅方式,下面咱们直接看代码实现吧。大家一看就明白了:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class HelloWorldCommand extends HystrixCommand<String> {

    private final String name;

    public HelloWorldCommand(String name) {
        //指定命令组名
        super(HystrixCommandGroupKey.Factory.asKey("myGroup"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        //逻辑封装在run里边
        return "Hello:" + name + " thread:" + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        //每个Command只能调用一次,不能重复调用。重复调用会报异常。
        HelloWorldCommand  helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
        //execute同步调用 等同于:helloWorldCommand.queue().get();
        String result = helloWorldCommand.execute();
        System.out.println("result:" + result);

        helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
        //异步调用
        Future<String> future = helloWorldCommand.queue();
        //get可以指定获取的时间100毫秒,默认为1秒
        result = future.get(100, TimeUnit.MILLISECONDS);
        System.out.println("result:" + result);
        System.out.println("main thread:" + Thread.currentThread().getName());

        testObserve();

    }

    public static void testObserve() {
        //注册观察者事件拦截
        Observable<String> observable = new HelloWorldCommand("observe").observe();
        //注册回调事件
        observable.subscribe(new Action1<String>() {
            @Override
            public void call(String result) {
                //result就是调用HelloWorldCommand的结果
                System.out.println("callback:" + result);
            }
        });
        //注册完成版的事件
        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted调用:onNext : onError之后调用");
            }

            @Override
            public void onError(Throwable throwable) {
                //异常产生了之后会调用
                System.out.println("onError:" + throwable.getMessage());
            }

            @Override
            public void onNext(String s) {
                //获取结果后回调
                System.out.println("onNext:" + s);
            }
        });
    }

}执行结果如下:

result:Hello:Synchronous-hystrix thread:hystrix-myGroup-1
result:Hello:Asynchronous-hystrix thread:hystrix-myGroup-2
main thread:main
callback:Hello:observe thread:hystrix-myGroup-3
onNext:Hello:observe thread:hystrix-myGroup-3
onCompleted调用:onNext : onError之后调用

  接下来是线程池隔离的例子:

import com.netflix.hystrix.*;

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class ThreadPoolCommand extends HystrixCommand<String> {

    private String name;

    public ThreadPoolCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
             .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
             .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withCircuitBreakerRequestVolumeThreshold(10) //至少10个请求,熔断器才进行错误计算 默认值20
                .withCircuitBreakerSleepWindowInMilliseconds(5000) //熔断终端5秒后会进入半打开状态
                .withCircuitBreakerErrorThresholdPercentage(50)    //错误率达到50开启熔断保护
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                //10个核心线程
             ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        return "threadPoolCommand name:" + name;
    }

    public static void main(String[] args) {
        ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
        String result = threadPoolCommand.execute();
        System.out.println("result:" + result);
    }
}

执行结果:
result:threadPoolCommand name:threadPool

  信号量隔离例子:

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class SemaphoreCommand extends HystrixCommand<String> {

    private String name;

    public SemaphoreCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("semaphoreGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("semaphoreCommand"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                 //至少10个请求,熔断器才会进行错误率的计算 默认值20
                .withCircuitBreakerRequestVolumeThreshold(10)
                 //熔断器中断请求5秒后会自动进入半打开状态,放部分流量进行重试 默认值5000ms
                .withCircuitBreakerSleepWindowInMilliseconds(5000)
                //错误率达到50开启熔断保护
                .withCircuitBreakerErrorThresholdPercentage(50)
                 //设置隔离策略
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                 //最大并发量10
                .withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        return "semaphore success name:" + name;
    }

    @Override
    protected String getFallback() {
        return "semaphore fallback name:" + name;
    }

    public static void main(String[] args) {
        SemaphoreCommand semaphoreCommand = new SemaphoreCommand("semaphoreCommand");
        String result = semaphoreCommand.execute();
        System.out.println(result);
    }

}
执行结果:
semaphore success name:semaphoreCommand

  在执行的过程中,如果出现调用服务的时候出现错误的时候会先进行熔断,就是如果流量达到设置的量的时候进行统计,比如10个请求,然后如果出现错误率超过配置的错误率就会进行将熔断进行打开,打开之后会进行调用降级方法fallback。过了一段时间后,可以放行部分流量,如果流量正常了,则会将熔断器开关关闭。下图是来自官方文档截图,里边维护者一个bucket,每秒一个bucket,里边记录着成功,失败,超时,拒绝。这个周期是通过withCircuitBreakerSleepWindowInMilliseconds配置的。

  接下来咱们看一下降级,也就是熔断器打开的时候,会走fallback方法,继续看例子。

import com.netflix.hystrix.*;

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class ThreadPoolCommand extends HystrixCommand<String> {

    private String name;

    public ThreadPoolCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
             .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
             .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withCircuitBreakerRequestVolumeThreshold(10) //至少10个请求,熔断器才进行错误计算 默认值20
                .withCircuitBreakerSleepWindowInMilliseconds(5000) //熔断终端5秒后会进入半打开状态
                .withCircuitBreakerErrorThresholdPercentage(50)    //错误率达到50开启熔断保护
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                //10个核心线程
             ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        return "threadPoolCommand name:" + name;
    }

    public static void main(String[] args) {
        ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
        String result = threadPoolCommand.execute();
        System.out.println("result:" + result);
    }
}
执行结果:
result:executed fallback
并且抛出超时异常。因为程序故意设计超时的。

  当然,Hystrixcommand还支持primary或secondary的方式,可以先看看流程图:

  是否执行primary是通过参数primarySecondary.userPrimary为true时执行。false的时候执行secondary方式。

/**
 * @author huangqingshi
 * @Date 2019-03-18
 */
public class PrimarySecondaryFacade extends HystrixCommand<String> {

    private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().
            getBooleanProperty("primarySecondary.usePrimary", true);

    private int id;

    public PrimarySecondaryFacade(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("primarySecondCommand"))
             //此处采用信号量,primary、secondary采用线程池
            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                    HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
            )

        );
        this.id = id;
    }

    @Override
    protected String run() throws Exception {
        if(usePrimary.get()) {
            return new PrimaryCommand(id).execute();
        } else {
            return new SecondaryCommand(id).execute();
        }
    }

    @Override
    protected String getFallback() {
        return "static-fallback-" + id;
    }

    @Override
    protected String getCacheKey() {
        return String.valueOf(id);
    }

    private static class PrimaryCommand extends HystrixCommand<String> {

        private final int id;

        private PrimaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                            withExecutionIsolationThreadTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            return "responseFromPrimary-" + id;
        }

    }

    private static class SecondaryCommand extends HystrixCommand<String> {

        private final int id;

        private SecondaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                            withExecutionIsolationThreadTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            return "responseFromSecondary-" + id;
        }

    }

    public static class UnitTest {

        @Test
        public void testPrimary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
                assertEquals("responseFromPrimary-100", new PrimarySecondaryFacade(100).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }

        @Test
        public void testSecondary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
                assertEquals("responseFromSecondary-100", new PrimarySecondaryFacade(100).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }
    }

}

  好了,这个基本上就是Hystrix的基本功能,但是有个问题就是Hystrix已经不维护了,但是目前的稳定版大家也都在使用,所以列出来了。当然也推荐大家使用Sentinel,功能比较强大,就是自适应限流功能等,功能也非常强大,后续研究之后再出相关文章吧。这个文章就当大家的一个敲门砖吧,有问题请及时告知,谢谢。

  

原文地址:https://www.cnblogs.com/huangqingshi/p/10555828.html

时间: 2024-10-07 14:50:03

聊聊微服务熔断降级Hystrix的相关文章

微服务熔断限流Hystrix之流聚合

简介 上一篇介绍了 Hystrix Dashboard 监控单体应用的例子,在生产环境中,监控的应用往往是一个集群,我们需要将每个实例的监控信息聚合起来分析,这就用到了 Turbine 工具.Turbine有一个重要的功能就是汇聚监控信息,并将汇聚到的监控信息提供给Hystrix Dashboard来集中展示和监控. 流程 实验 工程说明 工程名 端口 作用 eureka-server 8761 注册中心 service-hi 8762 服务提供者 service-consumer 8763 服

从 Spring Cloud 开始,聊聊微服务架构实践之路

[编者的话]随着公司业务量的飞速发展,平台面临的挑战已经远远大于业务,需求量不断增加,技术人员数量增加,面临的复杂度也大大增加.在这个背景下,平台的技术架构也完成了从传统的单体应用到微服务化的演进. 系统架构的演进过程 单一应用架构(第一代架构) 这是平台最开始的情况,当时流量小,为了节约成本,并将所有应用都打包放到一个应用里面,采用的架构为 .NET SQL Server: 表示层:位于最外层(最上层),最接近用户.用于显示数据和接收用户输入的数 据,为用户提供一种交互式操作的界面,平台所使用

SpringCloud微服务(03):Hystrix组件,实现服务熔断

本文源码:GitHub·点这里 || GitEE·点这里 一.熔断器简介 微服务架构特点就是多服务,多数据源,支撑系统应用.这样导致微服务之间存在依赖关系.如果其中一个服务故障,可能导致系统宕机,这就是所谓的雪崩效应. 1.服务熔断 微服务架构中某个微服务发生故障时,要快速切断服务,提示用户,后续请求,不调用该服务,直接返回,释放资源,这就是服务熔断. 熔断生效后,会在指定的时间后调用请求来测试依赖是否恢复,依赖的应用恢复后关闭熔断. 2.服务降级 服务器高并发下,压力剧增的时候,根据当业务情况

利用Spring Cloud实现微服务- 熔断机制

1. 熔断机制介绍 在介绍熔断机制之前,我们需要了解微服务的雪崩效应.在微服务架构中,微服务是完成一个单一的业务功能,这样做的好处是可以做到解耦,每个微服务可以独立演进.但是,一个应用可能会有多个微服务组成,微服务之间的数据交互通过远程过程调用完成.这就带来一个问题,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务,这就是所谓的"扇出".如果扇出的链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的&

微服务熔断隔离机制及注意事项

导读:本文重点分析微服务化过程中熔断机制及应用注意事项,包括微服务调用与"雪崩效应"及解决方案.熔断机制及考虑因素.隔离机制及实现方式考量等内容. 随着企业微服务化战略的实施,业务功能细分,越来越多的服务从原有的单体应用中分解成一系列独立开发.部署.运维的微小服务,服务之间则依赖于各种RPC框架互相通信.纵然,微服务化有着很多优势,但与之伴随而来的是各种复杂性,对开发人员来说,除了业务领域本身外,还需要考虑由于服务拆分之后诸如分布式事务.服务部署及运维.rpc调用等系列问题,本文将重点

Zuul2.X网关实现服务熔断降级

版本: <properties> <spring-boot.version>2.1.9.RELEASE</spring-boot.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> 所需依赖: <properties> <spring-cloud.version>Greenwich.SR4</s

聊聊微服务架构与应用

>>微服务架构 随着敏捷开发.持续交付以及基于Docker的应用部署的发展,微服务结构开始慢慢流行起来. >>应用架构演进 (1)垂直应用架构 传统的LAMP架构和Spring+Struts+iBatis/Hibernate的架构都是典型的垂直应用架构,垂直应用架构学习成本低,开发产出快,测试.部署和运维比较简单,在过去的十几年中一直比较流行.但是随着业务的发展,垂直应用架构逐渐暴露出一些缺陷,以Spring MVC架构为例,可能的表现:1.复杂应用的开发维护成本越来越高,测试变得

聊聊微服务架构实践之路的4大挑战,3月31日见真章!

当容器化的兴起,为应用开发部署带来变革,也为应用设计架构和运维部署带来变化: 当持续交付.DevOps.微服务,成为企业在软件成果对抗当中胜出的有力武器,微服务架构已经随处可见: 但随之而至的是微服务框架.微服务监控.微服务配置.微服务治理等一系列挑战, 从架构到发布,挑战重重,该如何应对容器化微服务架构的各种技术难题? 2018年3月31日,数人云联合ServiceComb社区,开启Building Microservice 系列活动 第2期 北京站 带你了解最新的微服务开源框架, 解析微服务

聊聊微服务的服务注册与发现

摘要: 一个好的服务注册发现中间件,应该是能完整地满足服务开发和治理的基础功能,然后才是性能和高可用.如果没有想清楚前面的功能,再高的可用性和性能都是浮云.最后,安全也同样重要.下面将从 服务注册.服务发现.容灾和高可用三个大方面来回答这些问题的主流做法. 引言 聊起微服务的服务注册与发现,很多人立马就会脱口而出 zk.etcd.consul.eureka 这些组件,进而聊到 CAP 如何取舍,性能如何,高可用和容灾是怎么实现的. 在这之前,站在组件使用者的角度,我想先问这么几个问题: 注册的