RabbitMQ学习(二)工作队列

1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者。

2.实际操作:

这里使用一个生产者产生多条数据提供给3个消费者

生产者代码:


public class Producter {

//队列名称

private final
static
String QUEUE_NAME = "Work_Queue";

public static void main(String[]
args) throws
IOException,
TimeoutException {

//配置rabbitmq服务器地址

ConnectionFactory
factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(5672);

factory.setUsername("starktan");

factory.setPassword("starktan");

factory.setVirtualHost("/");

//建立连接和通道

Connection
connection = factory.newConnection();

Channel channel =
connection.createChannel();

//声明队列,可以手动在mq中创建

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//写入10条数据(直接循环写入)

for (int i = 0; i < 10; i++) {

System.out.println("发送第" + i + "信息!");

String message = "WorkQueue
Message number "
+ i + " " + System.currentTimeMillis();

channel.basicPublish("", QUEUE_NAME, true, null,
message.getBytes());

}

channel.close();

connection.close();

}

}

消费者代码

public class Consumer {

//队列名称

private final static String QUEUE_NAME = "Work_Queue";

public static void main(String[] args) throws IOException,
InterruptedException, TimeoutException {

//创建连接和通道

ConnectionFactory factory =
new
ConnectionFactory();

factory.setHost("localhost");

final Connection connection = factory.newConnection();

ExecutorService service =
Executors.newFixedThreadPool(10);

for(int i=0;i<3;i++){

final int cur = i;

service.submit(new Runnable() {

Channel channel = connection.createChannel();

public void run() {

//创建队列消费者

QueueingConsumer consumer =
new
QueueingConsumer(channel);

//指定消费队列

try {

channel.basicConsume(QUEUE_NAME, true, consumer);

while (true)

{

//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)

QueueingConsumer.Delivery
delivery = consumer.nextDelivery();

String
message = new
String(delivery.getBody());

System.out.println("线程 "+cur+" 获取到消息 " + message + "开始处理");

Thread.sleep(1000*(cur+5)*2);

System.out.println("线程 "+cur+" "+message + "处理完成");

}

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

service.shutdown();

}

}

运行效果:(消费者循环调度)

3.消息确认:

在处理一个比较耗时的任务的时候,如果消费者在中途崩溃掉,则对应的这条数据就丢失了,为了避免消息丢失的情况,RabbitMQ提供了消息确认

使用两个消费者进行演示,调用方法

public void getConsum() throws IOException, TimeoutException, InterruptedException {
    //创建连接和通道
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //创建队列消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //指定消费队列,且改为手动确认
    channel.basicConsume(QUEUE_NAME, false, consumer);
    while (true)
    {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 获取到消息 " + message + "开始处理");
        try{
            Thread.sleep(10000);
        }catch (InterruptedException e){}
        finally {
         channel.basicAck(delivery.getEnvelope().getDeliveryTag()
                    , false);
        }
        System.out.println( message + "处理完成");
    }
}

手动关掉一个消费者

消息被另一个消费者继续进行处理;

4.公平调度:

        channel.basicQos(1);//保证一次只分发一个

5.持久化: 保证当RabbitMQ服务器崩溃关机也不会造成消息丢失

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

第二个参数改为true

时间: 2024-10-11 16:09:53

RabbitMQ学习(二)工作队列的相关文章

我的RabbitMQ学习2(工作队列)

创建一个工作队列 1.建立一个生成者  //初始化一个连接 生产者 -> (消费者) var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //对应的队列 channel.QueueDeclare(que

RabbitMQ学习二

RabbitMQ 是一个消息broker.它的主要概念就是接受和转发消息.可以把它当作一个邮局:当向邮箱投递一封邮件时,你确信邮差最终会将这封邮件投递到收件人.使用这个比喻,RabbitMQ就是邮箱,邮局和邮差. RabbitMQ和邮局最大的区别就是它不处理纸质信件而是处理二进制数据--消息 RabbitMQ和其他消息系统通常都有以下几个术语: 生产者   发送消息 队列 消费者 使用python驱动发送"Hello World!" pip install pika 参考资料: htt

RabbitMQ学习(二)_AMQP简介

AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信. 消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers). 由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上. AMQP 0-9

RabbitMQ(二) ——工作队列

RabbitMQ(二) --工作队列 (转载请附上本文链接--linhxx) 一.概述 工作队列模式(work queue),是有多个消费者的情况下,可以共同消费队列内的内容,加快消息处理速度.这是RabbitMQ的基本工作模式. 二.使用方式 和上一篇中的生产和消费消息的方式一样,就是需要多在cli进程中打开一个消费者的php文件.即需要打开3个php,一个是生产者的php文件,两个消费者的php文件(或多个php文件). 三.工作机制 3.1 轮询(Round-robin dispatchi

RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

RabbitMQ学习系列(四): 几种Exchange 模式

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ.今天说些理论的东西,Exchange 的几种模式. AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相

RabbitMQ学习系列(三): C# 如何使用 RabbitMQ

上一篇已经讲了Rabbitmq如何在Windows平台安装,还不了解如何安装的朋友,请看我前面几篇文章:RabbitMQ学习系列一:windows下安装RabbitMQ服务 , 今天就来聊聊 C# 实际开发的过程中,怎么调用 用RabbitMQ. 一.客户端 RabbitMQ 有很多客户端API,都非常的好用.我们在一边,一直用的都是 EasyNetQ,所以这里的 demo 只介绍 EasyNetQ 客户端实现.其他的客户端,大家自己去研究吧. EasyNetQ 是一个易于使用的RabbitMQ

RabbitMQ学习和使用

RabbitMQ学习和使用 RabbitMQ介绍 MQ全称Message Queue 消息队列,RabbitMQ是基于AMQP(高级消息队列协议)实现的.消息队列通常用以应用之间相互通信,解决同步问题.MQ是典型的生产者消费者模型,RabbitMQ最常用的三种模式是点对点模式.发布订阅模式.广播模式. RabbitMQ is a message-queueing software called a message broker or queue manager. Simply said; It

[Python 学习] 二、在Linux平台上使用Python

这一节,主要介绍在Linux平台上如何使用Python 1. Python安装. 现在大部分的发行版本都是自带Python的,所以可以不用安装.如果要安装的话,可以使用对应的系统安装指令. Fedora系统:先以root登入,运行 yum install python Ubuntu系统:在root组的用户, 运行 sudo apt-get install python 2. 使用的Python的脚本 Linux是一个以文件为单位的系统,那么我们使用的Python是哪一个文件呢? 这个可以通过指令

OpenCV for Python 学习 (二 事件与回调函数)

今天主要看了OpenCV中的事件以及回调函数,这么说可能不准确,主要是下面这两个函数(OpenCV中还有很多这些函数,可以在 http://docs.opencv.org/trunk/modules/highgui/doc/user_interface.html 找到,就不一一列举了),然后自己做了一个简单的绘图程序 函数如下: cv2.setMouseCallback(windowName, onMouse[, param]) cv2.createTrackbar(trackbarName,