RabbitMQ Work Queues(工作队列)

  • RabbitMQ Work Queues(工作队列)

    • 工作队列模式为一个生产者对应多个消费者,但是只有一个消费者获得消息,即一个队列被多个消费者监听,但一条消息只能被其中的一个消费者获取
    • 代码如下:

      生产者代码:

public class WorkSend {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //声明通道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", "hello", null, "First.".getBytes());
        channel.basicPublish("", "hello", null, "Secode..".getBytes());
        channel.basicPublish("", "hello", null, "Third....".getBytes());
        channel.basicPublish("", "hello", null, "Fourth....".getBytes());
        channel.basicPublish("", "hello", null, "Fifth.....".getBytes());
        //6、关闭通道和连接
        channel.close();
        connection.close();
    }
}

? 消费者代码

public class WorkRecv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");

        //声明通道
        Channel channel = connection.createChannel();

        //声明队列队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(" [x] Done");
                //channel.basicAck();
            }
        };
        boolean autoAck = true; // acknowledgment is covered below
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });

//        DeliverCallback deliverCallback = new DeliverCallback(){
//            @Override
//            public void handle(String consumerTag, Delivery delivery) throws IOException {
//                String message = new String(delivery.getBody(), "UTF-8");
//                System.out.println(" [x] Received '" + message + "'");
//            }
//        };
//
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
//            @Override
//            public void handle(String consumerTag) throws IOException {
//
//            }
//        });

    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

1 生产者将消息交个交换机
2 交换机交给绑定的队列
3 队列由多个消费者同时监听,只有其中一个能够获取这一条消息,形成了资源的争抢,谁的资源空闲大,争抢到的可能越大;

  • Round-robin dispatching(轮询分发)

    使用任务队列的优点之一是能够轻松并行工作。如果我们这里积压了很多的消息,我们可以增加work的并行度,这样就可以轻松扩展。

    默认情况下,RabbitMQ将每个消息依次发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

  • Message acknowledgment(消息确认)

    使用我们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除。在这种情况下,如果我们kill掉一个worker,我们将丢失正在处理的消息。并且还将丢失所有发送给该特定worker但尚未处理的消息。
    为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(告知),告知RabbitMQ特定的消息已被接收并处理,并且RabbitMQ可以自由删除它。
    如果消费者死了(其通道已关闭,连接已关闭或TCP连接丢失)而没有发送确认,RabbitMQ将了解消息未完全处理,并将重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使worker偶尔死亡也不会丢失任何消息。
    没有任何消息超时;消费者死亡时,RabbitMQ将重新传递消息。即使处理一条消息花费非常非常长的时间也没关系。
    默认情况下,手动消息确认处于打开状态。我们通过autoAck = false 标志显式关闭了它们。在消息完成投递的时候,手动确认消息投递成功。

    消费者代码修改如下:

public class ManWorkRecv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");

        //声明通道
        Channel channel = connection.createChannel();

        //声明队列队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(" [x] Done");
                //手动确认消息已经成功投递
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false; //设置消息手动确认
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });

    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

? 上述代码可以确保我们在kill掉一个消费者的情况下,消息不会丢失。

  • Message durability(消息持久化)

    前面已经讲了即使一个消费者退出,消息不会丢失,但是如果RabbitMQ服务退出时,队列和消息仍然会丢失,这是因为 默认队列和消息都是放在内存中的 。为保证消息和队列不丢失,需要把队列和消息设置为持久化
    确保RabbitMQ永远不会丢失我们的队列。声明持久化代码如下:

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);

    由于RabbitMQ已经存在一个hello队列,并且RabbitMQ不支持对已经存在的队列进行参数修改,所以需要我们删除之前创建的队列或者重新创建一个队列。

    其次,我们需要保证消息的持久化,消息持久化设置如下:

    channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "Fifth.....".getBytes());

    这样,我们就可以保证RabbitMQ服务宕机的情况下,消息和队列都不会丢失。

  • Fair dispatching(公平分发)

    ? RabbitMQ在进行消息分发的时候,可以设置一次分发给某一个消费者多少条消息,如在消费者端设置prefetchCount=1;如下代码

    int prefetchCount = 1 ;
    channel.basicQos(prefetchCount);

    该设置表示,RabbitMQ一次分发给消费者一条消息,在消费者处理并确认上一条消息之前,不会再给这个消费者发送一条新消息,而会将其分发给其他的消费者

  • SpringBoot实现:
    @SpringBootApplication
    @EnableScheduling
    public class RabbitAmqpTutorialsApplication {
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
        }
    
    }
     @Configuration
      public class Tut2Config {
    
          @Bean
          public Queue hello() {
              return new Queue("hello");
          }
    
          /**
           * 这里是启动两个消费者
           */
          private static class ReceiverConfig{
              @Bean
              public Tut2Receiver receiver1(){
                  return new Tut2Receiver(1);
              }
    
              @Bean
              public Tut2Receiver receiver2(){
                  return new Tut2Receiver(2);
              }
          }
    
          @Bean
          public Tut2Sender sender() {
              return new Tut2Sender();
          }
    
      }
    
    
    //监听队列
    @RabbitListener(queues = "hello")
    public class Tut2Receiver {
    
        private final int instance;
    
        public Tut2Receiver(int i) {
            this.instance = i;
        }
    
        @RabbitHandler
        public void receive(String in) throws InterruptedException {
            StopWatch watch = new StopWatch();
            watch.start();
            System.out.println("instance " + this.instance +
                    " [x] Received '" + in + "'");
            doWork(in);
            watch.stop();
            System.out.println("instance " + this.instance +
                    " [x] Done in " + watch.getTotalTimeSeconds() + "s");
        }
    
        private void doWork(String in) throws InterruptedException {
            for (char ch : in.toCharArray()) {
                if (ch == '.') {
                    Thread.sleep(1000);
                }
            }
        }
    
    }
    public class Tut2Sender {
        @Autowired
        private RabbitTemplate template;
    
        //队列
        @Autowired
        private Queue queue;
    
        AtomicInteger dots = new AtomicInteger(0);
    
        AtomicInteger count = new AtomicInteger(0);
    
        /**
         * 定时向队列hello发送消息
         */
        @Scheduled(fixedDelay = 1000, initialDelay = 500)
        public void send() {
            StringBuilder builder = new StringBuilder("Hello");
            if (dots.incrementAndGet() == 3) {
                dots.set(1);
            }
            for (int i = 0; i < dots.get(); i++) {
                builder.append('.');
            }
            builder.append(count.incrementAndGet());
            String message = builder.toString();
            //向队列中发送消息
            template.convertAndSend(queue.getName(), message);
            System.out.println(" [x] Sent '" + message + "'");
        }
    
    }

    相关代码链接: https://github.com/albert-liu435/springmq

原文地址:https://www.cnblogs.com/haizhilangzi/p/12301722.html

时间: 2024-11-07 15:28:56

RabbitMQ Work Queues(工作队列)的相关文章

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

RabbitMQ --- Work Queues(工作队列)

目录 RabbitMQ --- Hello Mr.Tua 前言 Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch).旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象. 原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行:当后台有多个 Consumer

2. RabbitMQ 之Work Queues (工作队列)

在上一篇揭开RabbitMQ的神秘面纱一文中,我们编写了程序来发送和接收来自命名队列的消息. 本篇我们将会创建一个 Work Queue(工作队列) 来使用分发任务在多个任务中. 前提:本教程假定RabbitMQ 已在标准端口(15672)上的localhost上安装并运行.如果您使用不同的主机,端口或凭据,则需要调整连接设置. 1. Work Queue 工作队列 工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成. 相反,我们安排任务稍后完成.我们将任务

我的RabbitMQ学习2(工作队列)

创建一个工作队列 1.建立一个生成者  //初始化一个连接 生产者 -> (消费者) var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //对应的队列 channel.QueueDeclare(que

RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

RabbitMQ入门(二)工作队列

??在文章RabbitMQ入门(一)之Hello World,我们编写程序通过指定的队列来发送和接受消息.在本文中,我们将会创建工作队列(Work Queue),通过多个workers来分配耗时任务. ??工作队列(Work Queue,也被成为Task Queue,任务队列)的中心思想是,避免立即执行一个资源消耗巨大且必须等待其完成的任务.相反地,我们调度好队列可以安排该任务稍后执行.我们将一个任务(task)封装成一个消息,将它发送至队列.一个在后台运行的work进程将会抛出该任务,并最终执

RabbitMQ学习三

Work Queues 在上一篇文章中,send.py程序向名为hello的队列发送消息,receive.py程序向名为hello的队列接收消息.这一节中,我们将创建一个Work Queue用于将那些比较耗时的任务分布到多个worker上. Work Queues工作队列或者叫做Task Queues任务队列的主要概念就是为了避免立刻执行一个耗费资源的任务并且不得不等待它执行完成.取而代之的是,我们将这个任务调度到以后去执行. 我们封装一个任务为一个消息并发送这个消息到队列.一个work pro

十一天 python操作rabbitmq、redis

1.启动rabbimq.mysql 在""运行""里输入services.msc,找到rabbimq.mysql启动即可 2.启动redis 管理员进入cmd,进入redis所在目录,执行redis-server.exe redis.windows.conf --maxmemory 200M  启动redis  server 执行redis-cli.exe启动客户端 一.python系列之 RabbitMQ - work queues 本节我们创建一个工作队列( w

RabbitMQ --- Routing(路由)

目录 RabbitMQ --- Hello Mr.Tua RabbitMQ --- Work Queues(工作队列) RabbitMQ --- Publish/Subscribe(发布/订阅) 前言 在上一章中介绍了 Publish/Subscribe(发布/订阅),它是把每个消息发送给多个 Consumer,也就是说每个 Consumer 都是接收所有的消息,辣么问题来了,如果 Consumer 只接收它想要的某一部分消息,那该怎么办呢?可以通过 Routing(路由)的机制来实现. Dir