任何应用都有一个设计指标,当应用的压力超过了他设计所能承载的能力时,就好比一座只允许行人通过的独木桥,是无法承载一辆坦克的重量的,这个时候,为了让机器能够继续运行,在不宕机的情况下尽其所能的对一部分用户提供服务,保证整个流程能够继续走下去,这个时候,就必须对应用进行流控,丢弃一部分用户的请无法避免。
流控可以从多个维度来进行,比如针对QPS,并发线程数,黑白名单,加权分级等等,最典型最直接的便是针对QPS和并发线程数的流控。当然,要进行流控,首先等有一个流控的阀值,这个阀值不是说拍拍脑袋就能够想出来,不同类型的应用,所面临的情况不一样,也没有一个统一的衡量标准,必须经过多轮的压力测试,才能够得出一个比较靠谱的数值。
一、简单的流控
1、使用Semphore进行并发流控
模拟代码如下所示:
Semaphore semphore = new Semaphore(10); if(semphore.getQueueLength() > 10){ //等待队列阀值为10时 return; } try { semphore.acquire(); //干活 } catch (InterruptedException e) { e.printStackTrace(); }finally{ semphore.release();//释放 }
也可以参见:http://ifeve.com/concurrency-practice-1/
2、使用乐观锁加上下文切换进行流控
public void enter(Object obj){ boolean isUpdate = false; int countValue = count.get(); if(countValue > 0){ isUpdate = count.compareAndSet(countValue, countValue -1); if(isUpdate)return; } concurQueue.add(obj); try { obj.wait(); } catch (InterruptedException e) { logger.error("flowcontrol thread was interrupted .......",e); } return ; } public void release(){ synchronized(count){ if(count.get() < VALVE){ count.set(count.get() + 1); } } Object obj = concurQueue.remove(); if(obj != null){ synchronized (obj) { obj.notify(); } } System.out.println("notify ..............."); return ; }
具体采用信号量还是使用上下文切换形式,需要根据临界代码段执行的时间而定
当请求进来时,调用配置的concurrentlock的enter方法,判断是否达到阀值,如果没有达到阀值,则进入,进行处理, 处理完后计数器加1,如果已经达到阀值则放入等待队列,因为等待队列是消耗内存的,因此等待队列也必须有阀值,如果队列超过阀值,请求直接丢弃
二、漏斗算法和桶令牌算法
利用现存的算法,比如:漏斗算法和桶令牌算法进行流量的控制。
关于漏斗算法和桶令牌算法可以参考:http://www.inter12.org/archives/962
https://blog.jamespan.me/2015/10/19/traffic-shaping-with-token-bucket/
1、漏桶算法(Leaky bucket)
漏桶算法强制一个常量的输出速率而不管输入数据流的突发性,当输入空闲时,该算法不执行任何动作.就像用一个底部开了个洞的漏桶接水一样,水进入到漏桶里,桶里的水通过下面的孔以固定的速率流出,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率.如下图所示:
2、令牌桶(Token bucket)
令牌桶算法的基本过程如下:
- 每秒会有 r 个令牌放入桶中,或者说,每过 1/r 秒桶中增加一个令牌
- 桶中最多存放 b 个令牌,如果桶满了,新放入的令牌会被丢弃
- 当一个 n 字节的数据包到达时,消耗 n 个令牌,然后发送该数据包
- 如果桶中可用令牌小于 n,则该数据包将被缓存或丢弃
漏桶和令牌桶比较
“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输数据外,还允许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的上限,因此它适合于具有突发特性的流量。