消费端限流策略

使用场景

首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况:
巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息
(通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息。

具体方法

void BasicQos(unit prefetchSize, ushort prefetchCount, boolean global);
prefetchSize:单条消息的大小限制,通常设置为0,意思是不做限制
prefetchCount:消息的条数,一般设置为1条
global:消息针对的级别,true:channel级别,false:consumer级别,通常设置为false

注意:prefetchSize和global这两项,rabbitmq没有实现,暂且不做研究,prefetchCount在自动应答的情况下是不生效的,必须进行手动签收

创建生产者

package com.dwz.rabbitmq.qos;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String routingkey = "qos.save";
        String msg = "Hello rabbitmq qos message!";

        for(int i = 0; i < 5; i++) {
            channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
        }

      channel.close();
      connection.close();

    }
}

创建消费者

package com.dwz.rabbitmq.qos;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String routingkey = "qos.#";
        String queueName = "test_qos_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingkey);
        //限流
        channel.basicQos(0, 1, false);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                //手动签收(因为只传递一条数据过来,所以不用批量接收 multiple=false)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //限流方式,第一件事就是autoAck设置为false
        channel.basicConsume(queueName, false, consumer);
    }
}

相关文章:

RabbitMQ消费端限流策略(十)

原文地址:https://www.cnblogs.com/zheaven/p/11832840.html

时间: 2024-11-09 03:58:45

消费端限流策略的相关文章

RabbitMQ消费端限流策略(十)

消费端限流: 什么是消费端限流? 场景: 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据.(导致服务器崩溃,线上故障) 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理. 解决方法: RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下, 如果一定数据的消息(通过基于consumer或者channel

高并发场景下的限流策略

高并发场景下的限流策略: 在开发高并发系统时,有很多手段来保护系统:缓存.降级.限流. 当访问量快速增长.服务可能会出现一些问题(响应超时),或者会存在非核心服务影响到核心流程的性能时, 仍然需要保证服务的可用性,即便是有损服务.所以意味着我们在设计服务的时候,需要一些手段或者关键数据进行自动降级,或者配置人工降级的开关. 缓存的目的是提升系统访问速度和增大系统处理的容量,可以说是抗高并发流量的银弹:降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉某些功能,等高峰或者问题解决后再打开:

高并发限流策略

在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹:而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开:而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀.抢购).写服务(如评论.下单).频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流. 限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到

常用限流方案的设计与实现

为了保证业务在高峰期的可用性,主流系统都会配备服务降级的工具,而限流就是目前系统最常采用的方案之一.限流即流量限制,目的是在遇到流量高峰或者流量突增(流量尖刺)时,把流量速率控制在合理的范围之内,不至于被高流量击垮. 常见的限流方式 服务降级中的限流并没有我们想象的那么简单.第一,限流方案必须时可选的,没有任何方案可以适用所有场景.第二,限流策略必须时可配置的. 集中常见的限流方式: 通过限制单位时间段内的调用量来限流. 通过限制系统的并发调用程度来限流. 通过漏桶(Leaky Bucket)算

Sentinel如何通过限流实现服务的高可用性

摘要: 在复杂的生产环境下可能部署着成千上万的服务实例,当流量持续不断地涌入,服务之间相互调用频率陡增时,会产生系统负载过高.网络延迟等一系列问题,从而导致某些服务不可用.如果不进行相应的流量控制,可能会导致级联故障,并影响到服务的可用性,因此如何对高流量进行合理控制,成为保障服务稳定性的关键. 在复杂的生产环境下可能部署着成千上万的服务实例,当流量持续不断地涌入,服务之间相互调用频率陡增时,会产生系统负载过高.网络延迟等一系列问题,从而导致某些服务不可用.如果不进行相应的流量控制,可能会导致级

【.NET Core项目实战-统一认证平台】第七章 网关篇-自定义客户端限流

[.NET Core项目实战-统一认证平台]开篇及目录索引 上篇文章我介绍了如何在网关上增加自定义客户端授权功能,从设计到编码实现,一步一步详细讲解,相信大家也掌握了自定义中间件的开发技巧了,本篇我们将介绍如何实现自定义客户端的限流功能,来进一步完善网关的基础功能. .netcore项目实战交流群(637326624),有兴趣的朋友可以在群里交流讨论. 一.功能描述 限流就是为了保证网关在高并发或瞬时并发时,在服务能承受范围内,牺牲部分请求为代价,保证系统的整体可用性而做的安全策略,避免单个服务

程序员修神之路--高并发优雅的做限流(有福利)

菜菜哥,有时间吗? YY妹,什么事? 我最近的任务是做个小的秒杀活动,我怕把后端接口压垮,X总说这可关系到公司的存亡 简单呀,你就做个限流呗 这个没做过呀,菜菜哥,帮妹子写一个呗,事成了,以后有什么要求随便说 那好呀,先把我工资涨一下 那算了,我找别人帮忙吧 跟你开玩笑呢,给哥2个小时时间 谢谢菜菜哥,以后你什么要求我都答应你 好嘞,年轻人就是豪爽 ◆◆ 技术分析 ◆◆ 如果你比较关注现在的技术形式,就会知道微服务现在火的一塌糊涂,当然,事物都有两面性,微服务也不是解决技术,架构等问题的万能钥匙

程序收藏不看系列:一文轻松搞定系统限流

1. 我们为什么需要限流 为了"反脆弱",在微服务复杂拓扑的情况下,限流是保障服务弹性和拓扑健壮的重中之重. 想一想,如果业务推出了一个秒杀活动,而你没有任何的限流措施:当你搭建了一个账号平台,而完全没有对十几个业务方设定流量配额--这些很有可能在特定场合下给你的产品带来大量的业务损失和口碑影响. 我们通常重点关注产品业务层面正向和逆向功能的完成,而对于逆向技术保障,这一点则是企业发展过程中很容易忽视的,所以一旦业务快速增长,这将给你的产品带来很大的隐患. 当然,也不是所有的系统都需要

这个注解一次搞定限流与熔断降级:@SentinelResource

在之前的<使用Sentinel实现接口限流>一文中,我们仅依靠引入Spring Cloud Alibaba对Sentinel的整合封装spring-cloud-starter-alibaba-sentinel,就完成了对所有Spring MVC接口的限流控制.然而,在实际应用过程中,我们可能需要限流的层面不仅限于接口.可能对于某个方法的调用限流,对于某个外部资源的调用限流等都希望做到控制.呢么,这个时候我们就不得不手工定义需要限流的资源点,并配置相关的限流策略等内容了. 今天这篇我们就来一起学