Spring Cloud Gateway 结合配置中心限流

前言

假设你领导给你安排了一个任务,具体需求如下:

  • 针对具体的接口做限流
  • 不同接口限流的力度可以不同
  • 可以动态调整限流配置,实时生效

如果你接到上面的任务,你会怎么去设计+实现呢?

每个人看待问题的角度不同,自然思考出来的方案也不同,正所谓条条大路通罗马,能到达目的地的路那就是一条好路。

如何分析需求

下面我给出我的实现方式,仅供各位参考,大牛请忽略。

具体问题具体分析,针对需求点,分别去做分析。

需求一 “如何针对具体的接口做限流” 这个在上篇文章中也有讲过,只需要让KeyResolver返回的是接口的URI即可,这样限流的维度那就是对这个接口进行限流。

需求二 “不同接口限流的力度可以不同” 这个通过配置的方式明显实现不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成动态的那么必须的自己通过扩展RedisRateLimiter来实现。

前提是必须有一个配置列表,这个配置列表就是每个接口对应的限流数值。有了这个配置我们就可以通过请求的接口获取这个接口对应的限流值。

需求三“可以动态调整限流配置,实时生效” 这个的话也比较容易,无论你是存文件,存数据库,存缓存只要每次都去读取,必然是实时生效的,但是性能问题我们不得不考虑啊。

存文件,读取文件,耗IO,主要是不方便修改
存数据库,可以通过web界面去修改,也可以直接改数据库,每次都要查询,性能不行
存分布式缓存(redis),性能比数据库有提高

对比下来肯定是缓存是最优的方案,还有更好的方案吗?
有,结合配置中心来做,我这边用自己的配置中心(https://github.com/yinjihuan/smconf)来讲解,换成其他的配置中心也是一样的思路

配置中心的优点在于它本来就是用来存储配置的,配置在项目启动时加载完毕,当有修改时推送更新,每次读取都在本地对象中,性能好。

具体方案有了之后我们就可以开始撸代码了,但是你有想过这么多接口的限流值怎么初始化吗?手动一个个去加?

不同的服务维护的小组不同,当然也有可能是一个小组维护,从设计者的角度来思考,应该把设置的权利交给用户,交给我们的接口开发者,每个接口能够承受多少并发让用户来定,你的职责就是在网关进行限流。当然在公司中具体的限制量也不一定会由开发人员来定哈,这个得根据压测结果,做最好的调整。

话不多说-开始撸码
首先我们定义自己的RedisRateLimiter,复制源码稍微改造下即可, 这边只贴核心代码。

public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
implements ApplicationContextAware {

public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";

public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
        Validator validator) {
    super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
    this.redisTemplate = redisTemplate;
    this.script = script;
    initialized.compareAndSet(false, true);
}

public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
    super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
    this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);
}

// 限流配置
private RateLimitConf rateLimitConf;

@Override
@SuppressWarnings("unchecked")
public void setApplicationContext(ApplicationContext context) throws BeansException {
   ** // 加载配置**
    this.rateLimitConf = context.getBean(RateLimitConf.class);
}

/**
 * This uses a basic token bucket algorithm and relies on the fact that
 * Redis scripts execute atomically. No other operations can run between
 * fetching the count and writing the new count.
 */
@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
    if (!this.initialized.get()) {
        throw new IllegalStateException("RedisRateLimiter is not initialized");
    }

    //Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);

    if (rateLimitConf == null) {
        throw new IllegalArgumentException("No Configuration found for route " + routeId);
    }
    Map<String,Integer> routeConfig = rateLimitConf.getLimitMap();

    // Key的格式:服务名称.接口URI.类型
    String replenishRateKey = routeId + "." + id + ".replenishRate";
    int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);

    String burstCapacityKey = routeId + "." + id + ".burstCapacity";
    int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);

    try {
        List<String> keys = getKeys(id);

        // The arguments to the LUA script. time() returns unixtime in
        // seconds.
        List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
                Instant.now().getEpochSecond() + "", "1");
        // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
        Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
        // .log("redisratelimiter", Level.FINER);
        return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
                .reduce(new ArrayList<Long>(), (longs, l) -> {
                    longs.addAll(l);
                    return longs;
                }).map(results -> {
                    boolean allowed = results.get(0) == 1L;
                    Long tokensLeft = results.get(1);

                    Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));

                    if (log.isDebugEnabled()) {
                        log.debug("response: " + response);
                    }
                    return response;
                });
    } catch (Exception e) {
        /*
         * We don‘t want a hard dependency on Redis to allow traffic. Make
         * sure to set an alert so you know if this is happening too much.
         * Stripe‘s observed failure rate is 0.01%.
         */
        log.error("Error determining if user allowed from redis", e);
    }
    return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
}

public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
    HashMap<String, String> headers = new HashMap<>();
    headers.put(this.remainingHeader, tokensLeft.toString());
    headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
    headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
    return headers;
}

}
需要在setApplicationContext中加载我们的配置类,配置类的定义如下:

@CxytianDiConf(system="fangjia-gateway")
public class RateLimitConf {
// 限流配置
@ConfField(value = "limitMap")
private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
put("default.replenishRate", 100);
put("default.burstCapacity", 1000);
}};
public void setLimitMap(Map<String, Integer> limitMap) {
this.limitMap = limitMap;
}
public Map<String, Integer> getLimitMap() {
return limitMap;
}
}
所有的接口对应的限流信息都在map中,有默认值,如果没有对应的配置就用默认的值对接口进行限流。

isAllowed方法中通过‘服务名称.接口URI.类型’组成一个Key, 通过这个Key去Map中获取对应的值。

类型的作用主要是用来区分replenishRate和burstCapacity两个值。

接下来就是配置CustomRedisRateLimiter:

@Beanbr/>@Primary
public CustomRedisRateLimiter customRedisRateLimiter(
ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
Validator validator) {
return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}
网关这边的逻辑已经实现好了,接下来就是需要在具体的服务中自定义注解,然后将限流的参数初始化到我们的配置中心就可以了。

定义注解

@Target(ElementType.METHOD)br/>@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ApiRateLimit {

/**
 * 速率
 * @return
 */
int replenishRate() default 100;

/**
 * 容积
 * @return
 */
int burstCapacity() default 1000;

}
启动监听器,读取注解,初始化配置

/**

  • 初始化API网关需要进行并发限制的API
  • @author yinjihuan
  • */
    public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {

    // Controller包路径
    private String controllerPath;

    private RateLimitConf rateLimitConf;

    private ConfInit confInit;

    private String applicationName;

    public InitGatewayApiLimitRateListener(String controllerPath) {
    this.controllerPath = controllerPath;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
    this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
    this.confInit = event.getApplicationContext().getBean(ConfInit.class);
    this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
    try {
    initLimitRateAPI();
    } catch (Exception e) {
    throw new RuntimeException("初始化需要进行并发限制的API异常", e);
    }
    }

    /**

    • 初始化需要进行并发限制的API
    • @throws IOException
    • @throws ClassNotFoundException
      */
      private void initLimitRateAPI() throws IOException, ClassNotFoundException {
      Map<String, Integer> limitMap = rateLimitConf.getLimitMap();
      ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
      List<String> classList = scan.getFullyQualifiedClassNameList();
      for (String clazz : classList) {
      Class<?> clz = Class.forName(clazz);
      if (!clz.isAnnotationPresent(RestController.class)) {
      continue;
      }
      Method[] methods = clz.getDeclaredMethods();
      for (Method method : methods) {
      if (method.isAnnotationPresent(ApiRateLimit.class)) {
      ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
      String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
      String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
      limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
      limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
      }
      }
      }
      rateLimitConf.setLimitMap(limitMap);
      // 初始化值到配置中心
      confInit.init(rateLimitConf);
      }

      private String getApiUri(Class<?> clz, Method method) {
      StringBuilder uri = new StringBuilder();
      uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
      if (method.isAnnotationPresent(GetMapping.class)) {
      uri.append(method.getAnnotation(GetMapping.class).value()[0]);
      } else if (method.isAnnotationPresent(PostMapping.class)) {
      uri.append(method.getAnnotation(PostMapping.class).value()[0]);
      } else if (method.isAnnotationPresent(RequestMapping.class)) {
      uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
      }
      return uri.toString();
      }
      }
      配置监听器

SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);
最后使用就很简单了,只需要增加注解就可以了

@ApiRateLimit(replenishRate=10, burstCapacity=100)br/>@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
return new HouseInfo(1L, "上海", "虹口", "东体小区");
}

总结

我这边只是给大家提供一种去实现的思路,也许大家还有更好的方案。
我觉得只要不让每个开发都去关心这种非业务性质的功能,那就可以了,都在框架层面处理掉。当然实现原理可以跟大家分享下,会用很好,既会用又了解原理那就更好了。

Spring Cloud Gateway 结合配置中心限流

原文地址:http://blog.51cto.com/13932491/2162550

时间: 2024-10-07 09:25:30

Spring Cloud Gateway 结合配置中心限流的相关文章

跟我学SpringCloud | 第六篇:Spring Cloud Config Github配置中心

SpringCloud系列教程 | 第六篇:Spring Cloud Config Github配置中心 Springboot: 2.1.6.RELEASE SpringCloud: Greenwich.SR1 如无特殊说明,本系列教程全采用以上版本 随着分布式项目越来越大,勤劳的程序猿们会开始面临一个挑战,配置文件会越来越繁杂,虽然spring提供了一个鸡肋版的解决方案,spring.profiles.active,在大型的分布式项目体系中,聊胜于无吧,手动维护配置文件的痛苦,生产,UAT,测

Spring Cloud微服务Sentinel+Apollo限流、熔断实战

在Spring Cloud微服务体系中,由于限流熔断组件Hystrix开源版本不在维护,因此国内不少有类似需求的公司已经将眼光转向阿里开源的Sentinel框架.而以下要介绍的正是作者最近两个月的真实项目实践过程,这中间被不少网络Demo示例级别水文误导过,为了以正视听特将实践过程加以总结,希望能够帮到有类似需要的朋友! 一.Sentinel概述 在基于Spring Cloud构建的微服务体系中,服务之间的调用链路会随着系统的演进变得越来越长,这无疑会增加了整个系统的不可靠因素.在并发流量比较高

Spring Cloud Config 分布式配置中心使用教程

一.简介 在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件.在Spring Cloud中,有分布式配置中心组件spring cloud config ,它支持配置服务放在配置服务的内存中(即本地),也支持放在远程Git仓库中.在spring cloud config 组件中,分两个角色,一是config server,二是config client. 二.构建Config Server 创建一个spring-boot项目,取名为config-s

构建微服务架构Spring Cloud:分布式配置中心

Spring Cloud Config是Spring Cloud团队创建的一个全新项目,用来为分布式系统中的基础设施和微服务应用提供集中化的外部配置支持,它分为服务端与客户端两个部分.其中服务端也称为分布式配置中心,它是一个独立的微服务应用,用来连接配置仓库并为客户端提供获取配置信息.加密/解密信息等访问接口:而客户端则是微服务架构中的各个微服务应用或基础设施,它们通过指定的配置中心来管理应用资源与业务相关的配置内容,并在启动的时候从配置中心获取和加载配置信息.Spring Cloud Conf

Spring Cloud Config分布式配置中心的使用和遇到的坑

分布式配置中心 为什么要有用分布式配置中心这玩意儿?现在这微服务大军已经覆盖了各种大小型企业,每个服务的粒度相对较小,因此系统中会出现大量的服务,每个服务都要有自己都一些配置信息,或者相同的配置信息,可能不同环境每个服务也有单独的一套配置,这种情况配置文件数量比较庞大,维护起来相当费劲,举个栗子: 在开发的过程中,一般数据库是开发环境数据库,所有服务DB的IP配置为:92.168.0.1,突然老大说,开发环境换了,DB的IP要修改,这下可不好受了,所有模块挨个修改DB的配置,就问你难受不难受?

spring cloud互联网分布式微服务云平台规划分析--spring cloud服务统一配置中心

1.介绍鸿鹄云架构[服务统一配置中心]为分布式系统中的外部配置提供服务器和客户端支持.使用commonservice-config,可以在所有环境中管理应用程序的外部属性.应用程序可通过从开发人员到测试和生产的部署流程,可以管理这些环境之间的配置,并确定应用程序具有迁移时需要运行的一切.服务器存储后端的默认实现使用git,因此它轻松支持标签版本的配置环境,以及可以访问用于管理内容的各种工具.很容易添加替代实现,并使用Spring Cloud Bus配置刷新方案.更多资源欢迎球911708498

spring cloud Nacos Config配置中心

概述Nacos 是阿里巴巴开源的一个更易于构建云原生应用的动态服务发现.配置管理和服务管理平台.Nacos Config就是一个类似于SpringCloud Config的配置中心接入SpringCloud项目集成Nacos Config配置中心很简单.只需要部署Nacos 客户端并在里面添加配置即可.然后引入Nacos Config动态读取即可1. 创建一个SpringCloud工程cloud-config 修改 pom.xml 文件,引入 Nacos Config Starter 前提得选引

spring cloud 搭建(配置中心)

创建配置中心: 选择Spring Initializr模板 选择Config Server pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schema

Spring Cloud Alibaba nacos 配置中心使用

背景 上一文我们讲到了如何去搭建注册中心,这一次我们讲述如何使用nacos作为注册中心 spring-cloud-alibaba-basis 创建基础依赖 首先我们创建一个spring-cloud-alibaba-basis 基础依赖 工程里面制定我们要用到的公用的版本 spring boot 版本 2.1.7.RELEASE spring cloud 版本 Greenwich.RELEASE spring cloud 阿里巴巴的版本 2.1.0.RELEASE Spring IO Platfo