RabbitMQ消费端限流策略(十)

消费端限流

什么是消费端限流

场景

 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:

 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据。(导致服务器崩溃,线上故障)

 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理。

解决方法

 RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下,

 如果一定数据的消息(通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息。减压减负  

 void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);

   消费端体现,一次最多能处理多少条消息(基本上为1),限流策略在什么上应用(channel--true,consumer---false)

   prefetchSize:0

   prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多余n个消息,

           一旦有n个消息还没有ack,则该consumer将block调,知道有消息ack

    global:true\false是否将上面设置应用于channel,简单的说就是上面限制是channel

            级别的还是consumer级别,基本使用false。

注意:prefetchSize和global这两项,rabbitmq没有实现,暂不研究

   prefetch_count在no_ack=false的情况下生效,在自动应答的情况下两个值不生效。

        //生产端代码
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";

        String msg = "Hello RabbitMQ QOS Message";

        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        //消费端代码
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        //1 限流方式  第一件事就是 autoAck设置为 false
        //接收1条消息,
        channel.basicQos(0, 1, false);

        channel.basicConsume(queueName, false, new MyConsumer(channel));
        //自定义消息端
        private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);     //接收ack进行消息发送
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //false不支持批量签收
        channel.basicAck(envelope.getDeliveryTag(), false);

    }

原文地址:https://www.cnblogs.com/luhan777/p/11193148.html

时间: 2024-09-30 17:45:52

RabbitMQ消费端限流策略(十)的相关文章

消费端限流策略

使用场景 首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据! Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息. 具体方法 void BasicQos(unit prefetchSize, ushort prefetchCount,

高并发场景下的限流策略

高并发场景下的限流策略: 在开发高并发系统时,有很多手段来保护系统:缓存.降级.限流. 当访问量快速增长.服务可能会出现一些问题(响应超时),或者会存在非核心服务影响到核心流程的性能时, 仍然需要保证服务的可用性,即便是有损服务.所以意味着我们在设计服务的时候,需要一些手段或者关键数据进行自动降级,或者配置人工降级的开关. 缓存的目的是提升系统访问速度和增大系统处理的容量,可以说是抗高并发流量的银弹:降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉某些功能,等高峰或者问题解决后再打开:

高并发限流策略

在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹:而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开:而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀.抢购).写服务(如评论.下单).频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流. 限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到

RabbitMQ消费端自定义监听(九)

场景: 我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理. 实际环境: 我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用. 操作: //生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制.因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率. 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务. 与celery相比 在推送任务方面比celery的delay要快,推送的任务小. 使用更简单,没那么花哨给函数加装饰器来注册函数路由. 可以满足生产了. 比之前的 使用redis原生list结构作为消息队列取代celery框架. 更好,主要是rabbitmq有消费

RabbitMQ消费端自定义监听器DefaultConsumer

消费者 package com.flying.rabbitmq.api.consumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 自定义消费者类型 */ public class Consumer { public static void main(String[] args) th

rabbitmq消费端的nack和重回队列的总结

重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式 消费者 package com.flying.rabbitmq.api.ack; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Consumer

RabbitMQ消费端消息的获取方式(.Net Core)

1[短链接]:BasicGet(String queue, Boolean autoAck) 通过request的方式独自去获取消息,断开式,一次次获取,如果返回null,则说明队列中没有消息. 隐患:每次获取消息都会创建channel. 优点:最安全的获取方式且性能不算太差. 2[长链接]: 1).EventingBasicConsumer[订阅式] 使用这种方式消息会全部打入当前消费者中,不管是否启用确认机制. 隐患:①根据消息的长短多少将影响当前消费者的占用资源. ②如果当前消费者挂掉,那

程序收藏不看系列:一文轻松搞定系统限流

1. 我们为什么需要限流 为了"反脆弱",在微服务复杂拓扑的情况下,限流是保障服务弹性和拓扑健壮的重中之重. 想一想,如果业务推出了一个秒杀活动,而你没有任何的限流措施:当你搭建了一个账号平台,而完全没有对十几个业务方设定流量配额--这些很有可能在特定场合下给你的产品带来大量的业务损失和口碑影响. 我们通常重点关注产品业务层面正向和逆向功能的完成,而对于逆向技术保障,这一点则是企业发展过程中很容易忽视的,所以一旦业务快速增长,这将给你的产品带来很大的隐患. 当然,也不是所有的系统都需要