coding++:Semaphore—RateLimiter-漏桶算法-令牌桶算法

java中对于生产者消费者模型,或者小米手机营销 1分钟卖多少台手机等都存在限流的思想在里面。

关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)

Semaphore:从线程个数限流

RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法

令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据

应用场景:

漏桶算法:必须读写分流的情况下,限制读取的速度

令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000实现的方法都是一样。

RateLimiter来实现对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题

1、关于RateLimter和Semphore简单用法

package concurrent;

import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.*;
import java.util.stream.IntStream;

import static java.lang.Thread.currentThread;

/**
 * ${DESCRIPTION}
 * 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
 * Semaphore:从线程个数限流
 * RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-15 22:44
 **/
public class RateLimiterExample {

   //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来
    private final static RateLimiter limiter=RateLimiter.create(0.5d);

    //同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流
    private final static Semaphore sem=new Semaphore(3);
    private static void testSemaphore(){
        try {
            sem.acquire();
            System.out.println(currentThread().getName()+" is doing work...");
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            sem.release();
            System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");
        }
    }

    public static void runTestSemaphore(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        IntStream.range(0,10).forEach((i)->{
            //RateLimiterExample::testLimiter 这种写法是创建一个线程
            service.submit(RateLimiterExample::testSemaphore);
        });
    }

    /**
     * Guava的RateLimiter
     */
    private static void testLimiter(){
        System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());
    }

    //Guava的RateLimiter
    public static void runTestLimiter(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        IntStream.range(0,10).forEach((i)->{
            //RateLimiterExample::testLimiter 这种写法是创建一个线程
            service.submit(RateLimiterExample::testLimiter);
        });
    }

    public static void main(String[] args) {
        IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9
        //runTestLimiter();
        runTestSemaphore();
    }
}

2、实现漏桶算法

package concurrent.BucketAl;

import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static java.lang.Thread.currentThread;

/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-20 22:42
 * 实现漏桶算法 实现多线程生产者消费者模型 限流
 **/
public class Bucket {
    //定义桶的大小
    private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();

    private final static int  BUCKET_LIMIT=1000;

    //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
    private final RateLimiter consumerRate=RateLimiter.create(10d);

    //往桶里面放数据时,确认没有超过桶的最大的容量
    private Monitor offerMonitor=new Monitor();

    //从桶里消费数据时,桶里必须存在数据
    private Monitor consumerMonitor=new Monitor();

    /**
     * 往桶里面写数据
     * @param data
     */
    public void submit(Integer data){
        if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
            try {
                container.offer(data);
                System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");
            } finally {
                offerMonitor.leave();
            }
        }else {
            //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
            //或者存入MQ DB等后续处理
            throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");
        }
    }

    /**
     * 从桶里面消费数据
     * @param consumer
     */
    public void takeThenConsumer(Consumer<Integer> consumer){
        if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){
            try {
                //不打印时 写 consumerRate.acquire();
                System.out.println(currentThread()+"  waiting"+consumerRate.acquire());
                Integer data = container.poll();
                //container.peek() 只是去取出来不会删掉
                consumer.accept(data);
            }finally {
                consumerMonitor.leave();
            }
        }else {
            //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
            System.out.println("will consumer Data from MQ...");
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

2.1 漏桶算法测试类

package concurrent.BucketAl;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static java.lang.Thread.currentThread;

/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-20 23:11
 * 漏桶算法测试
 * 实现漏桶算法 实现多线程生产者消费者模型 限流
 **/
public class BuckerTest {

    public static void main(String[] args) {
        final Bucket bucket = new Bucket();
        final AtomicInteger DATA_CREATOR = new AtomicInteger(0);

        //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个
        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                for (; ; ) {
                    int data = DATA_CREATOR.incrementAndGet();
                    try {
                        bucket.submit(data);
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (Exception e) {
                        //对submit时,如果桶满了可能会抛出异常
                        if (e instanceof IllegalStateException) {
                            System.out.println(e.getMessage());
                            //当满了后,生产线程就休眠1分钟
                            try {
                                TimeUnit.SECONDS.sleep(60);
                            } catch (InterruptedException e1) {
                                e1.printStackTrace();
                            }
                        }
                    }
                }
            }).start();
        });

        //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1
        IntStream.range(0, 10).forEach(i -> {
            new Thread(
                    () -> {
                        for (; ; ) {
                            bucket.takeThenConsumer(x -> {
                                System.out.println(currentThread()+"C.." + x);
                            });
                        }
                    }
            ).start();
        });

    }
}

3、令牌桶算法

package concurrent.TokenBucket;

import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;

/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-21 0:18
 * 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
 * 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
 *
 * 应用场景:
 * 漏桶算法:必须读写分流的情况下,限制读取的速度
 * 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000
 *
 * 实现的方法都是一样。RateLimiter来实现
 * 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
 **/
public class TokenBuck {

    //可以使用 AtomicInteger+容量  可以不用Queue实现
   private AtomicInteger phoneNumbers=new AtomicInteger(0);
   private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次
   //默认销售500台
   private final static int DEFALUT_LIMIT=500;
   private final int saleLimit;

    public TokenBuck(int saleLimit) {
        this.saleLimit = saleLimit;
    }

    public TokenBuck() {
        this(DEFALUT_LIMIT);
    }

    public int buy(){
        //这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起)
        //原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success
        //里面也需要加上double check
       /* if (phoneNumbers.get()>=saleLimit){
            throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")
        }*/

        //目前设置超时时间,10秒内没有抢到就抛出异常
        //这里的TimeOut*Ratelimiter=总数  这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算
         boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);
         if (success){
             if (phoneNumbers.get()>=saleLimit){
                 throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");
             }
             int phoneNo = phoneNumbers.getAndIncrement();
             System.out.println(currentThread()+" user has get :["+phoneNo+"]");
             return phoneNo;
         }else {
             //超时后 同一时间,很大的流量来强时,超时快速失败。
             throw new RuntimeException(currentThread()+"has timeOut can try again...");
         }

    }
}

3.1、令牌桶算法的测试类

package concurrent.TokenBucket;

import java.util.stream.IntStream;

/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-21 0:40
 **/
public class TokenBuckTest {
    public static void main(String[] args) {
        final TokenBuck tokenBuck=new TokenBuck(200);

        IntStream.range(0,300).forEach(i->{
            //目前测试时,让一个线程抢一次,不用循环抢
            //tokenBuck::buy 这种方式 产生一个Runnable
            new Thread(tokenBuck::buy).start();
        });
    }
}

原文地址:https://www.cnblogs.com/codingmode/p/11872771.html

时间: 2024-08-08 19:35:53

coding++:Semaphore—RateLimiter-漏桶算法-令牌桶算法的相关文章

理解流量监管和整形的关键算法—令牌桶

理解流量监管和整形的关键算法-令牌桶 无论是流量监管还是流量整形都提到一个超额流量的问题,而前面已经描述了监管和整形对超额流量的处理方式不同,监管丢弃或者重标记,流量整形是缓存,通过加大延迟的方式发送平滑的数据流量,那么网络设备怎么去确定这个超额流量,难道链路的带宽为512K,而此时用户以每秒768KB/s发送数据,使用768-512就256KB,难道超额的流量就是256KB吗?不是的,这样做是一种错误的理解,要确定用户的超额流量必须使用如下两种算法中的一种来确定,一种叫漏桶算法(leaky b

限流算法之漏桶算法、令牌桶算法

昨天CodeReview的时候看到同时使用RateLimiter这个类用作QPS访问限制.学习一下这个类. RateLimiter是Guava的concurrent包下的一个用于限制访问频率的类. 1.限流 每个API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流来保证接口的可用性或者降级可用性.即接口也需要安装上保险丝,以防止非预期的请求对系统压力过大而引起的系统瘫痪. 通常的策略就是拒绝多余的访问,或者让多余的访问排队等待服务,或者引流. 如果要准确的控制Q

令牌桶-流量控制

作为后台服务,通常有一个处理极限PPS(packets per second),如果请求超过了这个处理能力,可能会出现“雪崩效应”,因此后台服务需要有过载保护机制. 1.有个简单的算法可以实现流量控制功能:设置一个单位时间(如1s, 1min)内的最大访问量,并维护一个单位时间里的计数器. 当访问请求到达时,先判断单位控制时间是否已经超时,如果已经超时,重置计数器为0; 否则,将计数器加1,并判断计数器的值是否超过最大访问量设置,如超过,则拒绝访问. 伪代码如下: 1 long timeStam

coding++:高并发解决方案限流技术-使用RateLimiter实现令牌桶限流-Demo

RateLimiter是guava提供的基于令牌桶算法的实现类,可以非常简单的完成限流特技,并且根据系统的实际情况来调整生成token的速率. 通常可应用于抢购限流防止冲垮系统:限制某接口.服务单位时间内的访问量,譬如一些第三方服务会对用户访问量进行限制:限制网速,单位时间内只允许上传下载多少字节等. guava的maven依赖 <dependency> <groupId>com.google.guava</groupId> <artifactId>guav

令牌桶算法限流

令牌桶算法最初来源于计算机网络.在网络传输数据时,为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送.令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送. 1.https://blog.csdn.net/sunnyyoona/article/details/51228456 2.https://github.com/yangwenmai/ratelimit 3.https://juejin.im/post/5ab10045518825557005d

高并发学习之使用RateLimiter实现令牌桶限流

RateLimiter是guava提供的基于令牌桶算法的实现类,可以非常简单的完成限流特技,并且根据系统的实际情况来调整生成token的速率.通常可应用于抢购限流防止冲垮系统:限制某接口.服务单位时间内的访问量,譬如一些第三方服务会对用户访问量进行限制:限制网速,单位时间内只允许上传下载多少字节等. guava的maven依赖 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava

Guava-RateLimiter实现令牌桶控制接口限流方案

一.前言 限流的目的是通过对并发/一个时间窗口内的请求进行限速来达到保护系统的效果,一旦达到限制速率则可以拒绝服务.排队或等待.降级等处理. 二.常见限流方案   原理 优点 缺点 计数器法 在单位时间段内,对请求数进行计数,如果数量超过了单位时间的限制,则执行限流策略,当单位时间结束后,计数器清零,这个过程周而复始,就是计数器法. null 不能均衡限流,在一个单位时间的末尾和下一个单位时间的开始,很可能会有两个访问的峰值,导致系统崩溃.   漏桶算法                     

令牌桶过滤器(TBF)

令牌桶过滤器(TBF)是一个简单的队列规定,只允许不超过事先设定的速率到来的数据包通过,但可能允许短暂突发流量超过设定值. TBF很精确,对于网络和处理器的影响都比较小.如果对一个网卡限速,它应该成为第一选择. TBF的实现在于一个缓冲器(桶),不断地被一些叫做"令牌"的虚拟数据以特定速率填充(token rate).同最重要的参数就是它的大小,也就是它能够存储令牌的数量. 每个到来的令牌从数据队列中收集一个数据包,然后从桶中被删除.这个算法关联到两个流上----令牌流和数据流.于是由

Golang令牌桶-频率限制

令牌桶算法 令牌桶算法一般用做频率限制.流量限制等,可能具体有单速双色.单速三色.双速三色等方法. 我们的具体需求是对API的调用的频率做限制,因此实现的是单速双色. package main import ( "errors" "fmt" "strconv" "sync" "time" ) const TOKEN_GRANULARITY = 1000 type MAP struct { lock sync