RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点:

1、只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效

2、RabbitMQ3.5以后才支持优先级队列

代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下:

消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器)

<!-- ========================================RabbitMQ========================================= -->
    <!-- 连接工厂 -->
    <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
    <!-- 监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
        <rabbit:listener queues="red" method="onMessage" ref="redQueueListener" />
        <rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" />
        <rabbit:listener queues="queue" method="queueList" ref="queueListener" />
    </rabbit:listener-container>
    <!-- 队列声明 -->
    <rabbit:queue name="red" durable="true" />
    <rabbit:queue name="blue" durable="true" />
    <rabbit:queue name="queue" durable="true" />
    <!-- 红色监听处理器 -->
    <bean id="redQueueListener" class="com.aitongyi.customer.RedQueueListener" />
    <!-- 颜色监听处理器 -->
    <bean id="blueQueueListener" class="com.aitongyi.customer.BlueQueueListener" />
    <!-- 优先级队列监听处理器 -->
    <bean id="queueListener" class="com.aitongyi.customer.QueueListener" />

生产者增加一个主方法:

public static void main(String[] args)
    {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必须要设置

        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        for (final int i : priority)
        {
            template.convertAndSend("queue", (Object) ("queue" + i), new MessagePostProcessor() {

                @Override
                public Message postProcessMessage(Message arg0) throws AmqpException
                {
                    arg0.getMessageProperties().setPriority(i);
                    return arg0;
                }
            });
        }
    }

当然,还需要在客户端创建一个优先级队列:

注意,x-max-length = 9999,这个值最后不要写太大了,否则你电脑的内存会一直处于100%的使用状态(至于出现这种状况怎么解决,请点击:RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)),并且这个取值范围在0~255之间,超过了可能会出现问题,我测试了设置9999时,部分有问题,后面会把有问题的地方贴上来。

好了,所有的事情准备完毕了,我们先启动消费者,然后再运行主方法,此时的队列优先级设置为,private static final int[] priority = { 1, 5, 1, 2, 3, 4, 5, 5, 0, 3, 6, 10, 4, 100, 99, 98 };执行结果如下:

2017-05-16 09:51:48 399 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:51:48 417 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:48 435 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:51:48 493 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:51:48 514 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:51:48 596 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:51:48 950 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:48 975 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:49 015 [INFO] c.a.c.QueueListener - queueList Receved:queue0
2017-05-16 09:51:49 039 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:51:49 058 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:51:49 084 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:51:49 102 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:51:49 140 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:51:49 561 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:51:49 595 [INFO] c.a.c.QueueListener - queueList Receved:queue98

很奇怪,没有按照优先级执行?文章前面已经提到,只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。所以,我们需要先将所有的消息队列发送至服务器,然后再启动消费者处理消息,这样,就可以看到效果了。为了达到这个目的,则需要先运行主方法,然后再启动消费者,队列优先级设置和上方一样,执行结果如下:

2017-05-16 09:56:23 296 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:56:23 359 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:56:23 438 [INFO] c.a.c.QueueListener - queueList Receved:queue98
2017-05-16 09:56:23 500 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:56:23 594 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:56:23 656 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:23 765 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:23 874 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:24 077 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:56:24 202 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:56:24 262 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:56:24 311 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:56:24 389 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:56:24 422 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:56:24 500 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:56:24 547 [INFO] c.a.c.QueueListener - queueList Receved:queue0

这样,优先级队列就实现了,至于相同优先级的队列,执行顺序应该是随机的吧,我也没有测试,有兴趣的同学可以自己研究。

附:队列优先级设置为:private static final int[] priority = { 1, 5, 1, 2, 9998, 3, 4, 5, 999, 5, 0, 3, 6, 10, 4, 1000, 9999, 100, 99, 98, 899 };运行结果为:

2017-05-16 09:59:29 839 [INFO] c.a.c.QueueListener - queueList Receved:queue1000
2017-05-16 09:59:29 917 [INFO] c.a.c.QueueListener - queueList Receved:queue999
2017-05-16 09:59:29 995 [INFO] c.a.c.QueueListener - queueList Receved:queue899
2017-05-16 09:59:30 073 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:59:30 182 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:59:30 275 [INFO] c.a.c.QueueListener - queueList Receved:queue98
2017-05-16 09:59:30 353 [INFO] c.a.c.QueueListener - queueList Receved:queue9999
2017-05-16 09:59:30 431 [INFO] c.a.c.QueueListener - queueList Receved:queue9998
2017-05-16 09:59:30 509 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:59:30 619 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:59:30 670 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:30 733 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:30 826 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:31 138 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:59:31 282 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:59:31 316 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:59:31 379 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:59:31 410 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:59:31 472 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:59:31 712 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:59:31 745 [INFO] c.a.c.QueueListener - queueList Receved:queue0

恩,排序结果是有点乱。。。

时间: 2024-12-29 04:36:05

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列的相关文章

rabbitMQ学习笔记(五) 消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉. 这时候就可以对消息进行过滤了. 在消费者端设置好需要接收的消息类型. 如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型. channel.basicPublish( exchangeName , messageType , null , msg.getBytes()); 示例

Android学习笔记五之Service

Android学习笔记五之Service 1.什么是Service? 什么是Service?Service是Android系统的四大组件之一,官方文档是这样描述Service的: A Service is an application component that can perform long-running operations in the background and does not provide a user interface. Another application comp

jQuery源码学习笔记五 六 七 八 转

jQuery源码学习笔记五 六 七 八 转 Js代码   <p>在正式深入jQuery的核心功能选择器之前,还有一些方法,基本都是数组方法,用于遴选更具体的需求,如获得某个元素的所有祖选元素啦,等等.接着是其缓存机制data.</p> <pre class="brush:javascript;gutter:false;toolbar:false"> //@author  司徒正美|なさみ|cheng http://www.cnblogs.com/ru

Caliburn.Micro学习笔记(五)----协同IResult

Caliburn.Micro学习笔记(五)----协同IResult 今天说一下协同IResult 看一下IResult接口 /// <summary> /// Allows custom code to execute after the return of a action. /// </summary> public interface IResult { /// <summary> /// Executes the result using the specif

angular学习笔记(五)-阶乘计算实例(1)

<!DOCTYPE html> <html ng-app> <head> <title>2.3.2计算阶乘实例1</title> <meta charset="utf-8"> <script src="../angular.js"></script> <script src="script.js"></script> </

NLTK学习笔记(五):分类和标注词汇

[TOC] 词性标注器 之后的很多工作都需要标注完的词汇.nltk自带英文标注器pos_tag import nltk text = nltk.word_tokenize("And now for something compleyely difference") print(text) print(nltk.pos_tag(text)) 标注语料库 表示已经标注的标识符:nltk.tag.str2tuple('word/类型') text = "The/AT grand/J

Linux System Programming 学习笔记(五) 进程管理

1. 进程是unix系统中两个最重要的基础抽象之一(另一个是文件) A process is a running program A thread is the unit of activity inside of a process the virtualization of memory is associated with the process, the threads all share the same memory address space 2. pid The idle pro

java之jvm学习笔记五(实践写自己的类装载器)

java之jvm学习笔记五(实践写自己的类装载器) 课程源码:http://download.csdn.net/detail/yfqnihao/4866501 前面第三和第四节我们一直在强调一句话,类装载器和安全管理器是可以被动态扩展的,或者说,他们是可以由用户自己定制的,今天我们就是动手试试,怎么做这部分的实践,当然,在阅读本篇之前,至少要阅读过笔记三. 下面我们先来动态扩展一个类装载器,当然这只是一个比较小的demo,旨在让大家有个比较形象的概念. 第一步,首先定义自己的类装载器,从Clas

WEB前端学习笔记 五

接web前端学习笔记第四篇,此篇为web学习笔记 五,在此感谢您的采集和转发,但请注明文章出自网知博学. 2.0.3  html标签的属性格式 现在我们知道了两个双标签分别是,标题标签:<h1> - <h6>.和段落标签:<p></p>还知道了一个换行的单标签:<br />,现在我们给<p></p>标签添加一个属性,来改变段落是右对齐,还是左对齐,还是居中. 如上图,<p>标签中的 align(中文就是排列的意