前言
假设你领导给你安排了一个任务,具体需求如下:
- 针对具体的接口做限流
- 不同接口限流的力度可以不同
- 可以动态调整限流配置,实时生效
如果你接到上面的任务,你会怎么去设计+实现呢?
每个人看待问题的角度不同,自然思考出来的方案也不同,正所谓条条大路通罗马,能到达目的地的路那就是一条好路。
如何分析需求
下面我给出我的实现方式,仅供各位参考,大牛请忽略。
具体问题具体分析,针对需求点,分别去做分析。
需求一 “如何针对具体的接口做限流” 这个在上篇文章中也有讲过,只需要让KeyResolver返回的是接口的URI即可,这样限流的维度那就是对这个接口进行限流。
需求二 “不同接口限流的力度可以不同” 这个通过配置的方式明显实现不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成动态的那么必须的自己通过扩展RedisRateLimiter来实现。
前提是必须有一个配置列表,这个配置列表就是每个接口对应的限流数值。有了这个配置我们就可以通过请求的接口获取这个接口对应的限流值。
需求三“可以动态调整限流配置,实时生效” 这个的话也比较容易,无论你是存文件,存数据库,存缓存只要每次都去读取,必然是实时生效的,但是性能问题我们不得不考虑啊。
存文件,读取文件,耗IO,主要是不方便修改
存数据库,可以通过web界面去修改,也可以直接改数据库,每次都要查询,性能不行
存分布式缓存(redis),性能比数据库有提高
对比下来肯定是缓存是最优的方案,还有更好的方案吗?
有,结合配置中心来做,我这边用自己的配置中心(https://github.com/yinjihuan/smconf)来讲解,换成其他的配置中心也是一样的思路。
配置中心的优点在于它本来就是用来存储配置的,配置在项目启动时加载完毕,当有修改时推送更新,每次读取都在本地对象中,性能好。
具体方案有了之后我们就可以开始撸代码了,但是你有想过这么多接口的限流值怎么初始化吗?手动一个个去加?
不同的服务维护的小组不同,当然也有可能是一个小组维护,从设计者的角度来思考,应该把设置的权利交给用户,交给我们的接口开发者,每个接口能够承受多少并发让用户来定,你的职责就是在网关进行限流。当然在公司中具体的限制量也不一定会由开发人员来定哈,这个得根据压测结果,做最好的调整。
话不多说-开始撸码
首先我们定义自己的RedisRateLimiter,复制源码稍微改造下即可, 这边只贴核心代码。
public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
implements ApplicationContextAware {
public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";
public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
Validator validator) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
this.redisTemplate = redisTemplate;
this.script = script;
initialized.compareAndSet(false, true);
}
public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);
}
// 限流配置
private RateLimitConf rateLimitConf;
@Override
@SuppressWarnings("unchecked")
public void setApplicationContext(ApplicationContext context) throws BeansException {
** // 加载配置**
this.rateLimitConf = context.getBean(RateLimitConf.class);
}
/**
* This uses a basic token bucket algorithm and relies on the fact that
* Redis scripts execute atomically. No other operations can run between
* fetching the count and writing the new count.
*/
@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
//Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
if (rateLimitConf == null) {
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
Map<String,Integer> routeConfig = rateLimitConf.getLimitMap();
// Key的格式:服务名称.接口URI.类型
String replenishRateKey = routeId + "." + id + ".replenishRate";
int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);
String burstCapacityKey = routeId + "." + id + ".burstCapacity";
int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);
try {
List<String> keys = getKeys(id);
// The arguments to the LUA script. time() returns unixtime in
// seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
// .log("redisratelimiter", Level.FINER);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
} catch (Exception e) {
/*
* We don‘t want a hard dependency on Redis to allow traffic. Make
* sure to set an alert so you know if this is happening too much.
* Stripe‘s observed failure rate is 0.01%.
*/
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
}
public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
HashMap<String, String> headers = new HashMap<>();
headers.put(this.remainingHeader, tokensLeft.toString());
headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
return headers;
}
}
需要在setApplicationContext中加载我们的配置类,配置类的定义如下:
@CxytianDiConf(system="fangjia-gateway")
public class RateLimitConf {
// 限流配置
@ConfField(value = "limitMap")
private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
put("default.replenishRate", 100);
put("default.burstCapacity", 1000);
}};
public void setLimitMap(Map<String, Integer> limitMap) {
this.limitMap = limitMap;
}
public Map<String, Integer> getLimitMap() {
return limitMap;
}
}
所有的接口对应的限流信息都在map中,有默认值,如果没有对应的配置就用默认的值对接口进行限流。
isAllowed方法中通过‘服务名称.接口URI.类型’组成一个Key, 通过这个Key去Map中获取对应的值。
类型的作用主要是用来区分replenishRate和burstCapacity两个值。
接下来就是配置CustomRedisRateLimiter:
@Beanbr/>@Primary
public CustomRedisRateLimiter customRedisRateLimiter(
ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
Validator validator) {
return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}
网关这边的逻辑已经实现好了,接下来就是需要在具体的服务中自定义注解,然后将限流的参数初始化到我们的配置中心就可以了。
定义注解
@Target(ElementType.METHOD)br/>@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ApiRateLimit {
/**
* 速率
* @return
*/
int replenishRate() default 100;
/**
* 容积
* @return
*/
int burstCapacity() default 1000;
}
启动监听器,读取注解,初始化配置
/**
- 初始化API网关需要进行并发限制的API
- @author yinjihuan
- */
public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {// Controller包路径
private String controllerPath;private RateLimitConf rateLimitConf;
private ConfInit confInit;
private String applicationName;
public InitGatewayApiLimitRateListener(String controllerPath) {
this.controllerPath = controllerPath;
}@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
this.confInit = event.getApplicationContext().getBean(ConfInit.class);
this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
try {
initLimitRateAPI();
} catch (Exception e) {
throw new RuntimeException("初始化需要进行并发限制的API异常", e);
}
}/**
- 初始化需要进行并发限制的API
- @throws IOException
- @throws ClassNotFoundException
*/
private void initLimitRateAPI() throws IOException, ClassNotFoundException {
Map<String, Integer> limitMap = rateLimitConf.getLimitMap();
ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
List<String> classList = scan.getFullyQualifiedClassNameList();
for (String clazz : classList) {
Class<?> clz = Class.forName(clazz);
if (!clz.isAnnotationPresent(RestController.class)) {
continue;
}
Method[] methods = clz.getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(ApiRateLimit.class)) {
ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
}
}
}
rateLimitConf.setLimitMap(limitMap);
// 初始化值到配置中心
confInit.init(rateLimitConf);
}private String getApiUri(Class<?> clz, Method method) {
StringBuilder uri = new StringBuilder();
uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
if (method.isAnnotationPresent(GetMapping.class)) {
uri.append(method.getAnnotation(GetMapping.class).value()[0]);
} else if (method.isAnnotationPresent(PostMapping.class)) {
uri.append(method.getAnnotation(PostMapping.class).value()[0]);
} else if (method.isAnnotationPresent(RequestMapping.class)) {
uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
}
return uri.toString();
}
}
配置监听器
SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);
最后使用就很简单了,只需要增加注解就可以了
@ApiRateLimit(replenishRate=10, burstCapacity=100)br/>@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
return new HouseInfo(1L, "上海", "虹口", "东体小区");
}
总结
我这边只是给大家提供一种去实现的思路,也许大家还有更好的方案。
我觉得只要不让每个开发都去关心这种非业务性质的功能,那就可以了,都在框架层面处理掉。当然实现原理可以跟大家分享下,会用很好,既会用又了解原理那就更好了。
原文地址:http://blog.51cto.com/13932491/2162550