RabbitMQ五:生产者--队列--多消费者

一、生成者-队列-多消费者(前言)

上篇文章,我们做了一个简单的Demo,一个生产者对应一个消费者,本篇文章就介绍 生产者-队列-多个消费者,下面简单示意图

P 生产者    C 消费者  中间队列

需求背景:工厂某部门需要生产n个零件,部门下面有2个小组,每个小组需要生产n/2个

公平派遣

每个小组的情况下,当所有奇怪的信息都很重,甚至信息很轻的时候,一个工作人员将不断忙碌,另一个工作人员几乎不会做任何工作。那么,RabbitMQ不知道什么,还会平均分配消息。

这是因为当消息进入队列时,RabbitMQ只会分派消息。它不看消费者的未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。

下面就由我们撸代码实现,这一需求::::

二、代码

P 生产者代码:::

 static void Main(string[] args)
        {
            using (var channel = HelpConnection.GetConnection().CreateModel())
            {
                //声明队列
                channel.QueueDeclare("firstQueue", true, false, false, null);
                //声明路由
                channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                //绑定 建立关系
                channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");
                //内容的基本属性
               var properties=channel.CreateBasicProperties();
                //设置消息内容持久化
               properties.Persistent = true;
                int j = 0;
                for (int i = 0; i < 100; i++)
                {
                    var msg = Encoding.UTF8.GetBytes("生产者-队列-多个消费者" + i);
                    channel.BasicPublish(exchange: "firstExchange",
                                         routingKey: "firstQueue_Exchange",
                                         basicProperties: properties,
                                         body: msg);
                    j = i;
                    Console.WriteLine( i);
                }
                Console.WriteLine("添加成功" + j + "条");
                Console.ReadKey();
            }
        }

成功添加100条

C 消费者代码:::

 /// <summary>
        ///
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            using (var channel = HelpConnection.GetConnection().CreateModel())
            {
                //声明队列
                channel.QueueDeclare("firstQueue", true, false, false, null);
                //声明路由
                channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                //绑定 建立关系
                channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");

                //公平分发 同一时间只处理一个消息
                channel.BasicQos(0, 1, true);
                var conSumer = new EventingBasicConsumer(channel);
                conSumer.Received += (moede, e) =>
                {
                    var body = e.Body;
                    var msg = Encoding.UTF8.GetString(body);
                    Console.WriteLine("显示结果:"+msg);
                    //进行交付,确定此消息已经处理完成
                   // channel.BasicAck( e.DeliveryTag,  false);
                };
                //确认收到消息    进行消费
                channel.BasicConsume("firstQueue", true, conSumer);//false 手动应答;true:自动应答

                Console.ReadKey();
            }
        }

效果图

三、总结

本章总结注意几点:::

1、即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true。

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

2、公平分发同一时间只处理一个消息

channel.BasicQos(0,1,false)
  • 博主是利用读书、参考、引用、抄袭、复制和粘贴等多种方式打造成自己的纯镀 24k 文章,请原谅博主成为一个无耻的文档搬运工!
  • 小弟刚迈入博客编写,文中如有不对,欢迎用板砖扶正,希望给你有所帮助。
时间: 2024-10-11 09:43:07

RabbitMQ五:生产者--队列--多消费者的相关文章

用rabbitMQ实现生产者消费者

利用RabbitMQ实现生产者和消费者的一个小Demo 不做讲解 直接上代码 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /**  * 单消息队列通道  * Created by wangtf on 2015/11/16.  * 生产者  */ public class Producer {     

RabbitMQ之六种队列模式

先学习一下RabbitMQ中的六种队列,只学习前五种,具体的官方文档地址是:http://next.rabbitmq.com/getstarted.html 导入maven依赖: 1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>3.4.1</version> 5 </d

RabbitMQ五种工作模式学习总结

一.简介最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件.本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结.RabbitMQ常用的工作模式有:简单队列模式.工作队列模式.发布订阅模式.路由模式.主题模式.本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html. 二.简单队列模式(Simple Queue) [a]模型图:只包含一个生产者以及一个消费者,生产者Producer将消息

RabbitMQ (消息队列)专题学习06 Topic

(使用Java客户端) 一.概述 在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接.并且可以有选择的接收消息. 尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息. 在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源. 这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

RabbitMQ (消息队列)专题学习02 Hello World

一.概述 RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. 在专题学习一中我们已经简单提到了一些概念,在此我们更为深入的学习下RabbitMQ相关的专有名词. 1.生产

RabbitMQ (消息队列)专题学习01 RabbitMQ部署

一.概述 RabbitMQ(Message Queue)是当前流行的开源的消息队列系统,用ERLang语言开发,按照AMQP(Advanced Message Queue Protocol)的标准实现,消息队列是一种应用程序对应用程序之间的通信方法,应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,则无需专用链接来链接它们,RabbitMQ便是这样一种用于应用程序之间通信的中间件. 二.架构 RabbitMQ的架构图如下: 图-1 在此有几个概念需要说明一下: 1.Exchange:消

【RabbitMQ系列】队列、绑定、交换器

队列: 从概念上来讲,AMQP消息路由必须有三部分:交换器.队列和绑定.生产者把消息发布到交换器上:消息最终到达队列,并被消费者接收:绑定决定了消息如何从路由器路由到特定的队列. 消费者通过以下两种方式从特定的队列中接收消息: 1)通过AMQP的basic.consume命令订阅.这样做会将信道置为接收模式,知道取消对队列的订阅为止.订阅了消息后,消费者在消费(或者拒绝)最近接收的那道消息后,就能从队列中(可用的)自动接收下一条消息.如果消费者处理队列消息,并且/或者需要在消息已到达队列时就自动

SpringBoot中使用rabbitmq,activemq消息队列和rest服务的调用

1. activemq 首先引入依赖 pom.xml文件 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 创建一个配置队列类 JMSConfiguration.java package com.wangx.boot.util; impo