服务接口API限流 Rate Limit

一、场景描述

很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统,便在对服务接口做了许多策略:服务接口降级、限流、引流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的可用性。

二、常用的限流算法

常用的限流算法由:楼桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。

1、漏桶算法

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

   

可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

2、令牌桶算法

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

  令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

三、基于Redis功能的实现

简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。

1)使用Redis的incr命令,将计数器作为Lua脚本

1 local current
2 current = redis.call("incr",KEYS[1])
3 if tonumber(current) == 1 then
4     redis.call("expire",KEYS[1],1)
5 end

Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。

2)使用Reids的列表结构代替incr命令

 1 FUNCTION LIMIT_API_CALL(ip)
 2 current = LLEN(ip)
 3 IF current > 10 THEN
 4     ERROR "too many requests per second"
 5 ELSE
 6     IF EXISTS(ip) == FALSE
 7         MULTI
 8             RPUSH(ip,ip)
 9             EXPIRE(ip,1)
10         EXEC
11     ELSE
12         RPUSHX(ip,ip)
13     END
14     PERFORM_API_CALL()
15 END

Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,

RPUSHX在后续的计数操作中进行增加操作。

四、基于令牌桶算法的实现

令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。

  1 import java.io.BufferedWriter;
  2 import java.io.FileOutputStream;
  3 import java.io.IOException;
  4 import java.io.OutputStreamWriter;
  5 import java.util.Random;
  6 import java.util.concurrent.ArrayBlockingQueue;
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.ScheduledExecutorService;
  9 import java.util.concurrent.TimeUnit;
 10 import java.util.concurrent.locks.ReentrantLock;
 11
 12 import com.google.common.base.Preconditions;
 13 import com.netease.datastream.util.framework.LifeCycle;
 14
 15
 20 public class TokenBucket implements LifeCycle {
 21
 22 // 默认桶大小个数 即最大瞬间流量是64M
 23  private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
 24
 25 // 一个桶的单位是1字节
 26  private int everyTokenSize = 1;
 27
 28 // 瞬间最大流量
 29  private int maxFlowRate;
 30
 31 // 平均流量
 32  private int avgFlowRate;
 33
 34 // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
 35  private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
 36
 37 private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 38
 39 private volatile boolean isStart = false;
 40
 41 private ReentrantLock lock = new ReentrantLock(true);
 42
 43 private static final byte A_CHAR = ‘a‘;
 44
 45 public TokenBucket() {
 46  }
 47
 48 public TokenBucket(int maxFlowRate, int avgFlowRate) {
 49  this.maxFlowRate = maxFlowRate;
 50  this.avgFlowRate = avgFlowRate;
 51  }
 52
 53 public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
 54  this.everyTokenSize = everyTokenSize;
 55  this.maxFlowRate = maxFlowRate;
 56  this.avgFlowRate = avgFlowRate;
 57  }
 58
 59 public void addTokens(Integer tokenNum) {
 60
 61 // 若是桶已经满了,就不再家如新的令牌
 62  for (int i = 0; i < tokenNum; i++) {
 63  tokenQueue.offer(Byte.valueOf(A_CHAR));
 64  }
 65  }
 66
 67 public TokenBucket build() {
 68
 69 start();
 70  return this;
 71  }
 72
 73 /**
 74  * 获取足够的令牌个数
 75  *
 76  * @return
 77  */
 78  public boolean getTokens(byte[] dataSize) {
 79
 80 Preconditions.checkNotNull(dataSize);
 81  Preconditions.checkArgument(isStart, "please invoke start method first !");
 82
 83 int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
 84
 85 final ReentrantLock lock = this.lock;
 86  lock.lock();
 87  try {
 88  boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
 89  if (!result) {
 90  return false;
 91  }
 92
 93 int tokenCount = 0;
 94  for (int i = 0; i < needTokenNum; i++) {
 95  Byte poll = tokenQueue.poll();
 96  if (poll != null) {
 97  tokenCount++;
 98  }
 99  }
100
101 return tokenCount == needTokenNum;
102  } finally {
103  lock.unlock();
104  }
105  }
106
107 @Override
108  public void start() {
109
110 // 初始化桶队列大小
111  if (maxFlowRate != 0) {
112  tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
113  }
114
115 // 初始化令牌生产者
116  TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
117  scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
118  isStart = true;
119
120 }
121
122 @Override
123  public void stop() {
124  isStart = false;
125  scheduledExecutorService.shutdown();
126  }
127
128 @Override
129  public boolean isStarted() {
130  return isStart;
131  }
132
133 class TokenProducer implements Runnable {
134
135 private int avgFlowRate;
136  private TokenBucket tokenBucket;
137
138 public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
139  this.avgFlowRate = avgFlowRate;
140  this.tokenBucket = tokenBucket;
141  }
142
143 @Override
144  public void run() {
145  tokenBucket.addTokens(avgFlowRate);
146  }
147  }
148
149 public static TokenBucket newBuilder() {
150  return new TokenBucket();
151  }
152
153 public TokenBucket everyTokenSize(int everyTokenSize) {
154  this.everyTokenSize = everyTokenSize;
155  return this;
156  }
157
158 public TokenBucket maxFlowRate(int maxFlowRate) {
159  this.maxFlowRate = maxFlowRate;
160  return this;
161  }
162
163 public TokenBucket avgFlowRate(int avgFlowRate) {
164  this.avgFlowRate = avgFlowRate;
165  return this;
166  }
167
168 private String stringCopy(String data, int copyNum) {
169
170 StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
171
172 for (int i = 0; i < copyNum; i++) {
173  sbuilder.append(data);
174  }
175
176 return sbuilder.toString();
177
178 }
179
180 public static void main(String[] args) throws IOException, InterruptedException {
181
182 tokenTest();
183  }
184
185 private static void arrayTest() {
186  ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
187  tokenQueue.offer(1);
188  tokenQueue.offer(1);
189  tokenQueue.offer(1);
190  System.out.println(tokenQueue.size());
191  System.out.println(tokenQueue.remainingCapacity());
192  }
193
194 private static void tokenTest() throws InterruptedException, IOException {
195  TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
196
197 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
198  String data = "xxxx";// 四个字节
199  for (int i = 1; i <= 1000; i++) {
200
201 Random random = new Random();
202  int i1 = random.nextInt(100);
203  boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
204  TimeUnit.MILLISECONDS.sleep(100);
205  if (tokens) {
206  bufferedWriter.write("token pass --- index:" + i1);
207  System.out.println("token pass --- index:" + i1);
208  } else {
209  bufferedWriter.write("token rejuect --- index" + i1);
210  System.out.println("token rejuect --- index" + i1);
211  }
212
213 bufferedWriter.newLine();
214  bufferedWriter.flush();
215  }
216
217 bufferedWriter.close();
218  }
219
220 }

参考:

http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/

http://redisdoc.com/string/incr.html

http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html

时间: 2024-08-02 14:52:03

服务接口API限流 Rate Limit的相关文章

Spring Cloud微服务Sentinel+Apollo限流、熔断实战

在Spring Cloud微服务体系中,由于限流熔断组件Hystrix开源版本不在维护,因此国内不少有类似需求的公司已经将眼光转向阿里开源的Sentinel框架.而以下要介绍的正是作者最近两个月的真实项目实践过程,这中间被不少网络Demo示例级别水文误导过,为了以正视听特将实践过程加以总结,希望能够帮到有类似需要的朋友! 一.Sentinel概述 在基于Spring Cloud构建的微服务体系中,服务之间的调用链路会随着系统的演进变得越来越长,这无疑会增加了整个系统的不可靠因素.在并发流量比较高

数据结构与算法简记--剖析微服务接口鉴权限流背后的数据结构和算法

微服务鉴权限流剖析 微服务 把复杂的大应用,解耦拆分成几个小的应用. 有利于团队组织架构的拆分,毕竟团队越大协作的难度越大: 每个应用都可以独立运维,独立扩容,独立上线,各个应用之间互不影响. 有利就有弊: 大应用拆分成微服务之后,服务之间的调用关系变得更复杂,平台的整体复杂熵升高,出错的概率.debug 问题的难度都高了好几个数量级. 为了解决这些问题,服务治理便成了微服务的一个技术重点. 服务治理 简单点讲,就是管理微服务,保证平台整体正常.平稳地运行. 涉及的内容:鉴权.限流.降级.熔断.

微服务网关常用限流算法

常用算法有三种:计数器算法.漏斗桶算法.令牌桶算法,市面上最常用的是最后一个 第一个:计数器算法  他维护的是单位时间内的最大请求量,因此极端情况可能造成服务抖动  第二个:漏斗桶算法,这种算法保护了后端的微服务,但是会可能造成微服务网关压力激增 第三种:令牌桶算法 令牌桶算法相对于漏斗桶算法,其实就是少了一个输出速率的设置,他与漏斗桶算法相比,主要是为了保护网关自己,由于网关在实际的应用场景中会显得非常关键,因此大部分的限流算法都会选择令牌桶算法 原文地址:https://www.cnblog

「技术干货」Pontus-用友云限流服务

在我们讨论系统稳定性的时候,其实核心的关键词就是容量规划,如何做好业务容量与系统性能的评估,是保障系统稳定性的关键.对于系统性能的评估,需要我们具备自动化工具来对系统进行性能压测,探测系统在实际业务场景下的基线数据,这是我们进行系统资源配置的基础,也是在应对流量增长时进行弹性扩容的依据.在我们做好容量规划的前提下,在实际业务场景下,我们还是不可避免的会面对不确定的系统压力,在面对突发不确定流量的情况下,我们最担心的就是系统的"雪崩".就像突然爆发的车流让道路交通瘫痪一样,我们的系统在突

【Dnc.Api.Throttle】适用于.Net Core WebApi接口限流框架

Dnc.Api.Throttle    适用于Dot Net Core的WebApi接口限流框架 使用Dnc.Api.Throttle可以使您轻松实现WebApi接口的限流管理.Dnc.Api.Throttle支持IP.用户身份.Request Header.Request QueryString等多种限流策略,支持黑名单和白名单功能,支持全局拦截和单独Api拦截. Dnc.Api.Throttle暂时只支持Redis作为缓存和存储库,后续会进行扩展. 开始使用 安装Dnc.Api.Thrott

.Net微服务实践(四)[网关]:Ocelot限流熔断、缓存以及负载均衡

目录 限流 熔断 缓存 Header转化 HTTP方法转换 负载均衡 注入/重写中间件 后台管理 最后 在上篇.Net微服务实践(三)[网关]:Ocelot配置路由和请求聚合中我们介绍了Ocelot的配置,主要特性路由以及服务聚合.接下来,我们会介绍Ocelot的限流.熔断.缓存以及负载均衡. 限流 我们先来看限流的配置 Reroute节点中的配置如下: { "DownstreamPathTemplate": "/api/orders", "Downstr

限流(二)接口限流

如果某个接口可能出现突发情况,比如"秒杀"活动,那么很有可能因为突然爆发的访问量造成系统奔溃,我们需要最这样的接口进行限流. 在上一篇"限流算法"中,我们简单提到了两种限流方式: 1)(令牌桶.漏桶算法)限速率,例如:每 5r/1s = 1r/200ms 即一个请求以200毫秒的速率来执行: 2)(计数器方式)限制总数.或者单位时间内的总数,例如:设定总并发数的阀值,单位时间总并发数的阀值. 一.限制总并发数 我们可以采用java提供的atomicLong类来实现

Sentinel如何通过限流实现服务的高可用性

摘要: 在复杂的生产环境下可能部署着成千上万的服务实例,当流量持续不断地涌入,服务之间相互调用频率陡增时,会产生系统负载过高.网络延迟等一系列问题,从而导致某些服务不可用.如果不进行相应的流量控制,可能会导致级联故障,并影响到服务的可用性,因此如何对高流量进行合理控制,成为保障服务稳定性的关键. 在复杂的生产环境下可能部署着成千上万的服务实例,当流量持续不断地涌入,服务之间相互调用频率陡增时,会产生系统负载过高.网络延迟等一系列问题,从而导致某些服务不可用.如果不进行相应的流量控制,可能会导致级

Guava的RateLimiter实现接口限流

最近开发需求中有需要对后台接口进行限流处理,整理了一下基本使用方法. 首先添加guava依赖: <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> 然后封装RateLimiter适用对多接口的限制: import com.goog