dubbo是如何控制并发数和限流的?

ExecuteLimitFilter

ExecuteLimitFilter ,在服务提供者,通过 的 "executes" 统一配置项开启:
表示每服务的每方法最大可并行执行请求数。

ExecuteLimitFilter是通过信号量来实现的对服务端的并发数的控制。

ExecuteLimitFilter执行流程:

  1. 首先会去获得服务提供者每服务每方法最大可并行执行请求数
  2. 如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例
  3. 通过RpcStatus实例获取一个信号量,若果获取的这个信号量调用tryAcquire返回false,则抛出异常
  4. 如果没有抛异常,那么久调用RpcStatus静态方法beginCount,给这个 URL + 方法维度开始计数
  5. 调用服务
  6. 调用结束后计数调用RpcStatus静态方法endCount,计数结束
  7. 释放信号量

ExecuteLimitFilter

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

我们接下来看看RpcStatus这个类

    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();

    public static RpcStatus getStatus(URL url, String methodName) {
        String uri = url.toIdentityString();
        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
        if (map == null) {
            METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
            map = METHOD_STATISTICS.get(uri);
        }
        RpcStatus status = map.get(methodName);
        if (status == null) {
            map.putIfAbsent(methodName, new RpcStatus());
            status = map.get(methodName);
        }
        return status;
    }

这个方法很简单,大概就是给RpcStatus这个类里面的静态属性METHOD_STATISTICS里面设值。外层的map是以url为key,里层的map是以方法名为key。

    private volatile int executesPermits;
    public Semaphore getSemaphore(int maxThreadNum) {
        if(maxThreadNum <= 0) {
            return null;
        }

        if (executesLimit == null || executesPermits != maxThreadNum) {
            synchronized (this) {
                if (executesLimit == null || executesPermits != maxThreadNum) {
                    executesLimit = new Semaphore(maxThreadNum);
                    executesPermits = maxThreadNum;
                }
            }
        }

        return executesLimit;
    }

这个方法是获取信号量,如果这个实例里面的信号量是空的,那么就添加一个,如果不是空的就返回。

TPSLimiter

TpsLimitFilter 过滤器,用于服务提供者中,提供限流的功能。

配置方式:

  1. 通过 配置项,添加到 或 或 中开启,例如:
dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
    <dubbo:parameter key="tps" value="100" />
</dubbo:service>
  1. 通过 配置项,设置 TPS 周期。

源码分析

TpsLimitFilter

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    "Failed to invoke service " +
                            invoker.getInterface().getName() +
                            "." +
                            invocation.getMethodName() +
                            " because exceed max service tps.");
        }

        return invoker.invoke(invocation);
    }

invoke方法调用了DefaultTPSLimiter的isAllowable,我们进入到isAllowable方法看一下

DefaultTPSLimiter

    private final ConcurrentMap<String, StatItem> stats
            = new ConcurrentHashMap<String, StatItem>();
    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        //获取tps这个参数设置的大小
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        //获取tps.interval这个参数设置的大小,默认60秒
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        String serviceKey = url.getServiceKey();
        if (rate > 0) {
            StatItem statItem = stats.get(serviceKey);
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                        new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            return statItem.isAllowable();
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                stats.remove(serviceKey);
            }
        }

        return true;
    }

若要限流,调用 StatItem#isAllowable(url, invocation) 方法,根据 TPS 限流规则判断是否限制此次调用。

StatItem

    private long lastResetTime;

    private long interval;

    private AtomicInteger token;

    private int rate;

    public boolean isAllowable() {
        long now = System.currentTimeMillis();
         // 若到达下一个周期,恢复可用种子数,设置最后重置时间。
        if (now > lastResetTime + interval) {
            token.set(rate);// 回复可用种子数
            lastResetTime = now;// 最后重置时间
        }
        // CAS ,直到或得到一个种子,或者没有足够种子
        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        return flag;
    }

原文地址:https://www.cnblogs.com/luozhiyun/p/10960593.html

时间: 2024-12-25 14:41:58

dubbo是如何控制并发数和限流的?的相关文章

WCF之并发,吞吐量和限流

并发 Single重入模式.对于每一个服务实例,同一时刻只能处理一个请求,其他对该实例的请求被排队. PerCall,每一线程会分配一个新的服务实例上.不会有并发性问题.不影响吞吐量. PerSession,保护服务实例不会受到多线程客户端的影响.多客户端可以并发访问,无并发性问题.减少单一客户端吞吐量. Singleton, 实例不受任何并发访问影响.多线程和多客户端都无法并发访问.吞吐量最小. Retrant重入模式. 客户端可以对同一服务实例,可以进行第2次或者多次的请求进入. 单线程,用

spring控制并发数的工具类ConcurrencyThrottleSupport和ConcurrencyThrottleInterceptor

在ConcurrencyThrottleSupport类中,简单的通过synchronized和wati and notify达到控制线程数量的效果,从而实现限流的策略. 一.类图 二.主要方法 先看ConcurrencyThrottleInterceptor.java类的源码: 看该拦截器中的invoke()方法中,在执行目标方法的前后分别执行beforeAccess()和 afterAccess()方法, 在beforeAccess方法中通过内部计数器concurrencyCount来对比设

谈谈高并发系统的限流

开涛大神在博客中说过:在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.本文结合作者的一些经验介绍限流的相关概念.算法和常规的实现方式. 缓存 缓存比较好理解,在大型高并发系统中,如果没有缓存数据库将分分钟被爆,系统也会瞬间瘫痪.使用缓存不单单能够提升系统访问速度.提高并发访问量,也是保护数据库.保护系统的有效方式.大型网站一般主要是"读",缓存的使用很容易被想到.在大型"写"系统中,缓存也常常扮演者非常重要的角色.比如累积一些数据批量写入,内存里面的缓存

聊聊高并发系统之限流特技-1 开涛

在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹:而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开:而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀.抢购).写服务(如评论.下单).频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流. 限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到

高并发系统之限流特技

在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹:而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开:而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀.抢购).写服务(如评论.下单).频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流. 限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到

聊聊高并发系统之限流特技

在开发高并发系统时,有很多手段用来保护系统:缓存.降级和限流.缓存的目的是提升系统访问速度和增大系统处理能力,可谓是抗高并发流量的银弹:而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰过去或者问题解决后再打开的场景:而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀.抢购).写服务(如评论.下单).频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流. 限流的目的是通过对并发访问进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达

Linux-Shell-使用mkfifo实现多任务并发及并发数控制

默认的情况下,Shell脚本中的命令是串行执行的,必须等到前一条命令执行完后才执行接下来的命令,但是如果我有一大批的的命令需要执行,而且互相又没有影响的情况下(有影响的话就比较复杂了),那么就要使用命令的并发执行了. 如下: #!/bin/bash IPLIST=/home/meta/ipinfo/iplist for i in $(cat ${IPLIST} |grep -viE "^#|备机|ts"|awk '{print $1}') do ssh $i "cd ~/up

【nodejs爬虫】使用async控制并发写一个小说爬虫

最近在做一个书城项目,数据用爬虫爬取,百度了一下找到这个网站,以择天记这本小说为例. 爬虫用到了几个模块,cheerio,superagent,async. superagent是一个http请求模块,详情可参考链接. cheerio是一个有着jQuery类似语法的文档解析模块,你可以简单理解为nodejs中的jQuery. async是一个异步流程控制模块,在这里我们主要用到async的mapLimit(coll, limit, iteratee, callback) async.mapLim

「技术干货」Pontus-用友云限流服务

在我们讨论系统稳定性的时候,其实核心的关键词就是容量规划,如何做好业务容量与系统性能的评估,是保障系统稳定性的关键.对于系统性能的评估,需要我们具备自动化工具来对系统进行性能压测,探测系统在实际业务场景下的基线数据,这是我们进行系统资源配置的基础,也是在应对流量增长时进行弹性扩容的依据.在我们做好容量规划的前提下,在实际业务场景下,我们还是不可避免的会面对不确定的系统压力,在面对突发不确定流量的情况下,我们最担心的就是系统的"雪崩".就像突然爆发的车流让道路交通瘫痪一样,我们的系统在突