【RabbitMQ】3、work模式

上一篇博客的作为rabbitMQ的入门程序,也是简单队列模式,一个生产者,一个消费者,今天这篇博客介绍work模式,一个生产者,多个消费者,下面的例子模拟两个消费者的情况。

图示:
        

一个生产者、两个消费者;一个消息只能被一个消费者获取。

在work模式中可以分为两种模式,一种是两个消费者平均消费队列中的消息,即使他们的消费能力是不一样的,这种似乎不太符合实际的情况。另一种是能者多劳模式,处理消息能力强的消费者会获取更多的 消息,这种模式更符合实际需求。

生产者:向队列发送50条消息,下面代码中,生产者每生产一条消息后都会休眠一段时间,并且越往后休眠的时间越长。

public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}

消费者1:休眠时间为10ms

public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received ‘" + message + "‘");
//休眠
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

消费者2:休眠时间为1000ms

public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received ‘" + message + "‘");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

从上面代码中我们知道消费者2的休眠时间更长,消费者1的处理能力应该比消费者2更强,但是最后运行的结果会发现,两个消费者获得消息的数量十一样的,一个全部为奇数,一个全部为偶数。

我们把每个消费者中的代码:channel.basicQos(1);这句代码解开注释会发现消费者2获取到8条数据,而消费者1获取到42条消息。这样
的结果才符合work模式的能者多劳模式。这句代码表示服务器同一时刻只会发送一条消息给消费者,但并不指定是哪个消费者,处理能力高的人会接收到更多的
消息。

小结:

简单队列和work 模式的不同:
  简单队列只要消息从队列中获取,无论消费者获取到消息后是否成功消费,比如遇到状况:断电,都认为是消息已经成功消费;

work模式消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者反馈,如果消费这一直没有反馈,则该消息一直处于不可用状态。

具体使用哪种模式具体问题具体分析。

channel.basicConsume(QUEUE_NAME, false, consumer);false表示监听队列,手动返回完成状态,true表示自动返回。

原文地址:https://www.cnblogs.com/eastday/p/9692446.html

时间: 2024-10-19 03:56:36

【RabbitMQ】3、work模式的相关文章

RabbitMQ六种队列模式-简单队列模式

前言 RabbitMQ六种队列模式-简单队列 [本文]RabbitMQ六种队列模式-工作队列RabbitMQ六种队列模式-发布订阅RabbitMQ六种队列模式-路由模式RabbitMQ六种队列模式-主题模式 在官网的教程中,描述了如上六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列.也称为点对点模式 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者.同样也称为点对点模式 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个

RabbitMQ之消息模式(下)

目的: RabbitMQ之消息模式(上):https://www.cnblogs.com/huangting/p/11994539.html 消费端限流 消息的ACK与重回队列 TTL消息 死信队列 消费端限流 什么是消费端的限流? 假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据 消费端限流RabbitMQ提供的解决方案 RabbitMQ提供了一种qos(服务

Github开源:Sheng.RabbitMQ.CommandExecuter (RabbitMQ 的命令模式实现)

[Github]:https://github.com/iccb1013/Sheng.RabbitMQ.CommandExecuter Sheng.RabbitMQ.CommandExecuter 是使用 .Net 对 RabbitMQ 的一个简单封装. 它通过XML配置文件定义Exchange及队列等信息,根据此配置文件自动声明及初始化相关队列信息,方便 .Net 开发人员使用 RabbitMQ. 并实现了一个基于 MQ 的命令执行器,将 MQ 消息抽象化为命令,发布端和订阅端通过命令进行交互

spring boot整合RabbitMQ(Fanout模式)

1.Fanout Exchange介绍Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).Q

springboot使用rabbitmq fanout路由模式

fanout模式,生产者发送的消息到Exchange,Exchange同时往多个queue发送,多个消费者同时收到各自监听的queue消息 1.安装rabbitmq,pom.xml添加依赖,见之前博文有操作流程 2.添加配置文件,声明两个queue,一个fanoutExchange,然后将queue于Exchange进行绑定 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Bin

配置RabbitMQ默认群集模式

RabbitMQ是什么? MQ(Msaaage Queue,消息队列)是一种应用程序对应用程序的通信方式.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无须专用链接来连接它们.消息传递指的是程序之间通过在消息中发送数据进行通讯.而不是通过直接调用彼此来通信.队列的使用除去了接收和发送应用程序同时执行的要求. RabbirMQ使用场景 在项目中,将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统的吞吐量Rabbi

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

Rabbitmq(6) 主题模式

* 匹配1个 # 匹配所有 发送者: package com.aynu.bootamqp.service; import com.aynu.bootamqp.commons.utils.Amqp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; pu

RabbitMQ之消息模式

目的: 消息如何保证100%的投递 幂等性概念 Confirm确认消息 Return返回消息 自定义消费者 前言: 想必知道消息中间件RabbitMQ的小伙伴,对于引入中间件的好处可以起到抗高并发,削峰,业务解耦的作用并不陌生. 康康简单流程图了解一下.详情了解RabbitMQ可移步:https://www.cnblogs.com/huangting/p/11989597.html  注意:一般MQ中间件为了提高系统的吞吐量会把消息保存在内存中,如果不作其他处理,MQ服务器一旦宕机,消息将全部丢