2. RabbitMQ 之Work Queues (工作队列)

在上一篇揭开RabbitMQ的神秘面纱一文中,我们编写了程序来发送和接收来自命名队列的消息。

本篇我们将会创建一个 Work Queue(工作队列) 来使用分发任务在多个任务中。

前提:本教程假定RabbitMQ 在标准端口(15672)上的localhost上安装并运行。如果您使用不同的主机,端口或凭据,则需要调整连接设置。

1. Work Queue 工作队列

工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。

相反,我们安排任务稍后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作程序时,它们之间将共享任务。

这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。

如何理解上面这段话呢?

我们可以举个例子,假设用户有多个文件上传请求,然而Web应用对文件上传进行处理往往是一件比较耗时的操作,是无法立刻马上响应返回给客户端结果,这时候我们就需要一个工作队列来处理。

再比如生活中的买票,检票,大家都知道,当我们买票检票,大多需要排队一个一个处理,两者类似。

在上篇中我们发送了一个Hello World 信息,现在我们将发送复杂任务的字符串。

但是我们没有实际的应用场景,所以我们这里暂时使用 Thread.sleep() 函数来模拟PDF 文件上传实现延迟效果。

我们创建一个生产者(产生消息,发送消息的一方),文件名称叫做NewTask.java:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/***
 * 生产者
 * ********/
public class NewTask {
    private static final String TASK_QUEUE_NAME = "task_queue";

      public static void main(String[] argv) throws Exception {

        //创建和消息队列的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //第二个参数为true 确保关闭RabbitMQ服务器时执行持久化
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        //从命令行发送任意消息
        String message = getMessage(argv);

        //将消息标记为持久性 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。
        channel.basicPublish("",
                TASK_QUEUE_NAME,//指定消息队列的名称
                MessageProperties.PERSISTENT_TEXT_PLAIN,//指定消息持久化
            message.getBytes("UTF-8"));//指定消息的字符编码
        //打印生产者发送成功的消息
        System.out.println(" [x] Sent ‘" + message + "‘");

        //关闭资源
        channel.close();
        connection.close();
      }

      /***
       * 一些帮助从命令行参数获取消息
       * @param strings 从命令行发送任意消息字符串
       * */
      private static String getMessage(String[] strings) {
        if (strings.length < 1)
          return "Hello World!";
        return joinStrings(strings," ");
      }

      /**
       * 字符串数组
       * @param delimiter 分隔符
       * */
      private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
          words.append(delimiter).append(strings[i]);
        }
        return words.toString();
      }
}

接下来我们创建我们的消费者,它需要为消息体中的每个点伪造一秒钟的工作。它将处理传递的消息并执行任务。

Worker.java

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        //和消息队列创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        //指定消息队列的名称
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人
        int prefetchCount = 1 ;
        channel.basicQos(prefetchCount);
        //channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received ‘" + message + "‘");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //boolean autoAck = false;
        //channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

    /*****
     * 我们的假任务是模拟执行时间
     * */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == ‘.‘) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

2. RabbitMQ 之循环调度

使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作积压,我们可以添加更多工人,这样就可以轻松扩展。

首先,让我们尝试同时运行两个worker实例。他们都会从队列中获取消息,但究竟如何呢?让我们来看看。

我们选中Work.java 右键,Run as -----> Java Application ,执行三次,启动三个实例。

这样一来就相当于有了三个消费者,

然后我们同理开始尝试多次运行NewTask.java,这样将产生多个任务

然后我们可以清楚在控制台看到这样的情况,

第一个Work.java

第二个Work.java

第三个work.java

Tips: 上面是work.java 运行了三次,newTask 运行了四次。

也就是说任务有四个,按照循环调度算法,第一个循环到第二个循环,所以有了两个消息,而其他消息有了一个消息。

第一个work.java 的控制台收到了两条消息后继续等待收消息

第二个work.java 的控制台收到了一条消息后也继续等待接受消息

第三个work.java 的控制台也收到了 一条消息后继续等待接受消息

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。

平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法。

Tips: 一个圆圈待表收到的一个消息,一个矩形代表一个Work 实例

3. Message acknowledgment 消息确认

虽然执行任务可能需要几秒钟,但是可能我们会好奇想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。

使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。

在这种情况下,如果你直接关闭一个worker 实例,我们将丢失它刚刚处理的消息。我们还将丢失分发给这个特定工作者但尚未处理的所有消息。

但我们不想失去任何任务。如果一个worker实例死亡,我们希望将任务交付给另一名Worker。

为了确保消息永不丢失,RabbitMQ支持 message acknowledgments. (消息确认)。

消费者发回一个 ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它。

如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将理解消息未完全处理并将重新排队。

如果其他消费者同时在线,则会迅速将其重新发送给其他消费者。这样你就可以确保没有消息丢失,即使Worker偶尔会死亡。

没有任何消息超时; 当消费者死亡时,RabbitMQ将重新发送消息。即使处理消息需要非常长的时间,也没关系。

默认情况下, Manual message acknowledgments  手动消息已打开。

在前面的示例中,第二个参数我们通过autoAck = true 标志明确地将它们关闭。

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

Tips: true 即autoAck 的值

一旦我们完成任务,第二个参数就应该将此标志设置为false并从工作人员发送适当的确认。

channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

使用此代码,我们可以确定即使您在处理消息时使用CTRL + C杀死一名Worker 实例,也不会丢失任何内容。

因为Worker死后不久,所有未经确认的消息将被重新传递。

确认必须在收到的交付的同一信道上发送。尝试使用不同的通道进行确认将导致通道级协议异常. 详情看 doc guide on confirmations 了解更多。

Forgotten acknowledgment 被遗忘的通知

错过basicAck是一个常见的错误。这是一个简单的错误,但后果是严重的。(也就是这句话如果忘了写,后果很严重,该消息将一直发送不出去)

channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。

为了调试这种错误,您可以使用rabbitmqctl 来打印messages_unacknowledged字段

Linux 下:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows 下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

4.Message durability 消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的

boolean durable = true ;
channel.queueDeclare(“hello”,durable,false,false,null);
虽然此命令本身是正确的,但它在我们当前的设置中不起作用。那是因为我们已经定义了一个名为hello的队列 ,这个队列不耐用。

RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue:
boolean durable = true ;
channel.queueDeclare(“task_queue”,durable,false,false,null);

此queueDeclare更改需要应用于生产者和消费者代码。

此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。

现在我们需要将消息标记为持久性 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。

Note on message persistence

将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且尚未保存消息时,仍然有一个短时间窗口。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 发布者确认

5. Fair dispatch 公平派遣

您可能已经注意到调度仍然无法完全按照我们的意愿运行。

例如,在有两个工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一个工人将经常忙碌而另一个工作人员几乎不会做任何工作。

好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。

发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。

它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。

为了打败我们可以使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。

        int prefetchCount = 1 ;
        channel.basicQos(prefetchCount);

关于队列大小的说明

如果所有工作人员都很忙,您的队列就会填满。您将需要密切关注这一点,并可能添加更多工作人员,或者采取其他策略。

使用消息确认和prefetchCount,您可以设置工作队列。即使RabbitMQ重新启动,持久性选项也可以使任务生效。

本篇完~



原文地址:https://www.cnblogs.com/xingyunblog/p/9670218.html

时间: 2024-10-01 03:32:11

2. RabbitMQ 之Work Queues (工作队列)的相关文章

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

RabbitMQ Work Queues(工作队列)

RabbitMQ Work Queues(工作队列) 工作队列模式为一个生产者对应多个消费者,但是只有一个消费者获得消息,即一个队列被多个消费者监听,但一条消息只能被其中的一个消费者获取 代码如下: 生产者代码: public class WorkSend { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //获取连接

RabbitMQ(二) ——工作队列

RabbitMQ(二) --工作队列 (转载请附上本文链接--linhxx) 一.概述 工作队列模式(work queue),是有多个消费者的情况下,可以共同消费队列内的内容,加快消息处理速度.这是RabbitMQ的基本工作模式. 二.使用方式 和上一篇中的生产和消费消息的方式一样,就是需要多在cli进程中打开一个消费者的php文件.即需要打开3个php,一个是生产者的php文件,两个消费者的php文件(或多个php文件). 三.工作机制 3.1 轮询(Round-robin dispatchi

RabbitMQ官网教程---工作队列

(使用python的pika 0.9.8客户端) 在第一个教程中,我们写了一个从命名的队列中发送和接收消息的程序.在这一个里面,我们将创建一个Work Queue来用于在多个工作者之间分类耗时任务. Work Queues后面的主要思想是避免理解做一些资源密集的任务并且需要等待它完成.我们用计划任务在后面完成它.我们把一个任务封装为一个消息发送给队列.一个在后台执行的工作者队列将弹出任务并且完全的执行这个工作.当你运行许多工作者的时候,这个任务将在它们之间共享. 这种概念在web应用程序中是特别

RabbitMQ (二)工作队列 -摘自网络

这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务.工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成.相反地,我们进行任务调度:我们把任务封装为消息发送给队列.工作进行在后台运行并不断的从队列中取出任务然后执行.当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行. 这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务. 1. 准备 我们使用Thread.sleep来模拟耗时的任务.我们在发送到队列的消息的末尾添加

RabbitMQ :VHost,Exchanges, Queues,Bindings and Channels

和RabbitMQ这个项目的缘分好奇怪,很长一段时间内是只关注源代码,真的是Erlang开源项目中的典范;现在要在项目中应用RabbitMQ,从新的视角切入,全新的感觉.仿佛旧情人换了新衣,虽是熟稔却有不曾领略的风情. RabbitMQ提供了一整套机制来处理消息的发送,接收,容错,管理,上一篇文章中我提到了一篇Rabbits and warrens的文章,是一篇非常棒的入门文章,但是里面忽略了不少细节,我沿着RabbitMQ in Action重新梳理了一遍,笔记于此,备忘. Exchanges

rabbitmq消息队列学习——&quot;工作队列&quot;

二."工作队列" 在第一节中我们发送接收消息直接从队列中进行.这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务. 工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务.实际上我们将这些任务时序话稍后分发完成.我们将某个任务封装成消息然后发送至队列,后台运行的工作进程将这些消息取出然后执行这些任务.当你运行多个工作进程的时候,这些任务也会在它们之间共享. 前期准备 上一节的练习中我们发送的是简单包含"Hello World!"的消息,这节我们还发

RabbitMQ指南(C#)(二)工作队列

上一节我们实现了向指定的队列发送和接收消息.这一节,我们主要讲工作队列,用于在多个消费者之间分配置实时任务. 工作队列方式主要是为了防止在执行一个耗费资源的任务时,要等待其结束才能处理其它事情.我们将任务的执行延迟,将其封装成一个消息,然后发送给一个列队.后台再运行一个程序从队列里取出消息,然后执行任务.如果有多个消费者,还可以分享任务. 对于Web应用程序来说,这样就可以使用Http的短请求来处理复杂的业务. 准备 我们发送一个字符串来代表复杂的任务,然后用Thread.Sleep()来模拟耗

RabbitMQ使用场景_002_工作队列

工作队列 利用轮循分配来消费任务信息(竞争消费者模式) 背后的主要思想工作队列(又名:任务队列)是为了避免立即做一个资源密集型任务,不得不等待它完成.相反,我们安排以后的任务要做.我们封装任务作为消息并将其发送到一个队列.一个工作进程在后台运行将流行的任务和最终执行这项工作.当您运行许多消费者的任务将在他们之间共享. 循环调度与公平的分配 使用一个任务队列的优点之一是能够轻易并行化"parallelise"工作.如果建立一个任务队列并添加多个消费者,,RabbitMQ将发送每个消息到下