RabbitMQ消费端消息的获取方式(.Net Core)

1【短链接】:BasicGet(String queue, Boolean autoAck)

  通过request的方式独自去获取消息,断开式,一次次获取,如果返回null,则说明队列中没有消息。

    隐患:每次获取消息都会创建channel。

    优点:最安全的获取方式且性能不算太差。

2【长链接】:  

  1)、EventingBasicConsumer【订阅式】

  使用这种方式消息会全部打入当前消费者中,不管是否启用确认机制。

    隐患:①根据消息的长短多少将影响当前消费者的占用资源。

       ②如果当前消费者挂掉,那么未处理的消息将会丢失。

    解决:【QOS + Ack】  服务质量 + 消息确认

        即在channel设置好通道使消息一条一条的从队列中打过来,确认一条打一条。

 1             //UInt32 prefetchSize,  每次取的长度
 2             //UInt16 prefetchCount,     每次取几条
 3             //Boolean global    是否对connection通用
 4             channel.BasicQos(5, 1, true);
 5             //使用事件机制获取消息
 6             EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
 7             consumer.Received += (sender, e) =>
 8             {
 9                 Console.WriteLine(Encoding.UTF8.GetString(e.Body));
10                 channel.BasicAck(e.DeliveryTag, false);
11             };
12             channel.BasicConsume("headersQueue", false, consumer);

  2)、QueueingBasicConsumer【死循环】【已过时】

 1             QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
 2             channel.BasicConsume("headersQueue", false, consumer);
 3             while (true)
 4             {
 5                 var result = consumer.Queue.Dequeue();
 6                 if(result != null)
 7                 {
 8                     Console.WriteLine(Encoding.UTF8.GetString(result.Body));
 9                     channel.BasicAck(result.DeliveryTag, false);
10                 }
11             }

原文地址:https://www.cnblogs.com/fanqisoft/p/10398340.html

时间: 2024-11-09 03:45:44

RabbitMQ消费端消息的获取方式(.Net Core)的相关文章

RabbitMQ消费端自定义监听(九)

场景: 我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理. 实际环境: 我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用. 操作: //生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory

RabbitMQ消费端限流策略(十)

消费端限流: 什么是消费端限流? 场景: 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据.(导致服务器崩溃,线上故障) 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理. 解决方法: RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下, 如果一定数据的消息(通过基于consumer或者channel

kafka消费端提交offset的方式

Kafka 提供了 3 种提交 offset 的方式 自动提交 复制 1234 // 自动提交,默认trueprops.put("enable.auto.commit", "true");// 设置自动每1s提交一次props.put("auto.commit.interval.ms", "1000"); 手动同步提交 offset 复制 1 consumer.commitSync(); 手动异步提交 offset 复制 1

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制.因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率. 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务. 与celery相比 在推送任务方面比celery的delay要快,推送的任务小. 使用更简单,没那么花哨给函数加装饰器来注册函数路由. 可以满足生产了. 比之前的 使用redis原生list结构作为消息队列取代celery框架. 更好,主要是rabbitmq有消费

rabbitmq消费端的nack和重回队列的总结

重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式 消费者 package com.flying.rabbitmq.api.ack; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Consumer

RabbitMQ消费端自定义监听器DefaultConsumer

消费者 package com.flying.rabbitmq.api.consumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 自定义消费者类型 */ public class Consumer { public static void main(String[] args) th

Dubbo服务提供端与消费端应用的搭建

Demo结构介绍 Demo使用Maven聚合功能,里面有三个模块,目录如下: 其中Consumer模块为服务消费者,里面TestConsumer和consumer.xml组成了基于Spring配置方式的服务调用,TestConsumerApi是基于Dubbo API方式的服务调用,TestConsumerApiGeneric是泛化方式的服务调用,TestConsumerAsync是异步调用的方式. 其中Provider模块为服务提供者,里面TestProvider和provider.xml组成了

消费端限流策略

使用场景 首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据! Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息. 具体方法 void BasicQos(unit prefetchSize, ushort prefetchCount,

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起