1,优缺点
优点 |
缺点 |
支持集群,速度和性能可能会高于其他MQ产品 |
只支持点对点的消费方式 |
不会丢失消息,稳定性要好 |
|
消息的消费者可以时间无关 |
2, 模式
|
普通模式 |
镜像模式 |
单一模式 |
集群 |
默认的集群模式 |
把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案 |
非集群模式 |
说明 |
对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构 |
消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取 |
|
细节 |
当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈 |
||
问题 |
A节点故障后,B节点无法取到A节点中还未消费的消息实体 |
除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉 |
|
问题的方案 |
消息持久化,那么得等A节点恢复,然后才可被消费 |
3, 集群
(1) 基本概念:
(2) RabbitMQ的集群节点:
a) 内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
b) 磁盘节点:保存状态到内存和磁盘。
(3) 如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘
(4) 内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
集群的安装
(1) mq202,mq203两台服务器做为RabbitMQ集群节点,分别安装RabbitMq-Server ,安装后分别启动RabbitMq-server
(2) (2)在安装好的三台节点服务器中,分别修改/etc/hosts文件,指定mq202,mq203的hosts,如:172.17.0.202 mq202 172.17.0.203 mq203
(3) 设置cookie:Rabbitmq的集群是依赖于erlang的集群来工作的,所以必须先构建起erlang的集群环境。Erlang的集群中各节点是通过一个magic cookie来实现的,这个cookie存放在C:\Users\guozhaoxia\.erlang.cookie和c:/window/.erlang.cookie中。所以必须保证各节点cookie保持一致,否则节点之间就无法通信
(4) 停止所有节点RabbitMq服务,然后使用detached参数独立运行,这步很关键,尤其增加节点停止节点后再次启动遇到无法启动都可以参照这个顺序
(5) cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.4\sbin
a) rabbitmqctl stop
b) rabbitmq-server –detached
c) rabbitmqctl cluster_status(查看集群状态)
d) rabbitmqctl stop_app
(6) 将mq202,mq203连接起来执行如下命令:
a) rabbitmqctl stop_app
b) rabbitmqctl join_cluster --ram [email protected](注意MQ要大写,-ram是指内存节点,不如果设置为磁盘节点时,去掉-ram)
c) rabbitmqctl start_app
镜像的配置
(1) 使用Rabbit镜像功能,需要基于rabbitmq策略来实现,政策是用来控制和修改群集范围的某个vhost队列行为和Exchange行为
(2) 在cluster中任意节点启用策略,策略会自动同步到集群节点
(3) 添加策略的语法:
(4) set_policy [-p vhostpath] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
(5) eg:rabbitmqctl set_policy -p hrsystem ha-allqueue "^message" ‘{"ha-mode":"all"}‘
(6) 这行命令在vhost名称为hrsystem创建了一个策略,策略名称为ha-allqueue,策略模式为 all 即复制到所有节点,包含新增节点,
(7) 策略正则表达式为 “^” 表示所有匹配所有队列名
(注意:"^message" 这个规则要根据自己修改,这个是指同步"message"开头的队列名称,我们配置时使用的应用于所有队列,所以表达式为"^")
4, RabbitMQ支持的方式
1, direct(直接投递)
2, Fanout(广播投递)
3, Topic可以按照一个topic名字的模式进行匹配routing_key
5, RabbitMQ命令
应用和集群管理
1.停止RabbitMQ应用,关闭节点
# rabbitmqctl stop
2.停止RabbitMQ应用
# rabbitmqctl stop_app
3.启动RabbitMQ应用
# rabbitmqctl start_app
4.显示RabbitMQ中间件各种信息
# rabbitmqctl status
5.重置RabbitMQ节点
# rabbitmqctl reset
# rabbitmqctl force_reset
从它属于的任何集群中移除,从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息。
force_reset命令和reset的区别是无条件重置节点,不管当前管理数据库状态以及集群的配置。如果数据库或者集群配置发生错误才使用这个最后 的手段。
注意:只有在停止RabbitMQ应用后,reset和force_reset才能成功。
6.循环日志文件
# rabbitmqctl rotate_logs[suffix]
7.集群管理 www.2cto.com
# rabbitmqctl cluster clusternode…
用户管理
1.添加用户
# rabbitmqctl add_user username password
2.删除用户
# rabbitmqctl delete_user username
3.修改密码
# rabbitmqctl change_password username newpassword
4.列出所有用户
# rabbitmqctl list_users
权限控制
1.创建虚拟主机
# rabbitmqctl add_vhost vhostpath
2.删除虚拟主机 www.2cto.com
# rabbitmqctl delete_vhost vhostpath
3.列出所有虚拟主机
# rabbitmqctl list_vhosts
4.设置用户权限
# rabbitmqctl set_permissions [-p vhostpath] username regexp regexp regexp
5.清除用户权限
# rabbitmqctl clear_permissions [-p vhostpath] username
6.列出虚拟主机上的所有权限
# rabbitmqctl list_permissions [-p vhostpath]
7.列出用户权限
# rabbitmqctl list_user_permissions username
6, RabbitMQ的控制台
(1) cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.4\sbin
(2) rabbitmq-plugins.bat enable rabbitmq_management
7, 常用RabbitMQ的API的介绍
Channel对象的方法
queueDeclare
AMQP.Queue.DeclareOk
|
queueDeclare ()
Actively declare a server-named exclusive, autodelete, non-durable queue. |
AMQP.Queue.DeclareOk
|
queueDeclare (java.lang.String queue, boolean durable, boolean exclusive, boolean autoDelete, java.util.Map<java.lang.String,java.lang.Object> arguments)
Declare a queue |
8,消息队列模式
HelloWorld(简单模式)
RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。
channel.queueDeclare:第一个参数:队列名字,第二个参数:队列是否可持久化即重启后该队列是否依然存在,第三个参数:该队列是否是独占的即客户端连接上来时它就占用整个连接,第四个参数:是否自动销毁即当这个队列不再被使用的时候即没有消费者对接上来时自动删除,第五个参数:其他参数如TTL(队列存活时间)等。
channel.basicConsume:第一个参数:队列名字,第二个参数:是否自动应答,如果为真,消息一旦被消费者收到,服务端就知道该消息已经投递,从而从队列中将消息剔除,否则,需要在消费者端手工调用channel.basicAck()方法通知服务端,如果没有调用,消息将会进入unacknowledged状态,并且当消费者连接断开后变成ready状态重新进入队列,第三个参数,具体消费者类
Work queues(工作队列)
现在我们来发送一条代表复杂任务的字符串。我们这里没有一个真实存在的任务,例如修改图片大小和渲染pdf文件这类的任务,这里我们模拟一个任务繁忙的场景(使用Thread.sleep()函数)。这里我们使用字符串类的点号个数来代表任务的复杂性,每一个点号都占用一秒钟的处理时间。例如,一个用”Hello…”来描述的伪造的任务将会占用三秒时间。
创建工作队列用来把一些比较耗时的任务分配给多个worker。
工作队列的主要思想就是避开立刻处理某个资源消耗交大的任务并且需要等待它执行完成。取而代之的是我们可以将它加入计划列表,并在后边执行这些任务。我们将任务分装成一个消息,并发送到队列中。后台的工作程序在接收到消息后将会立刻执行任务。当运行多个执行器时,任务将会在他们之间共享。
Publish/Subscribe(发布/订阅)
(1) 定义:一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)
(2) RabbitMQ的消息发送模型核心思想是生产者不直接把消息发送到消息队列中。事实上,生产者不知道自己的消息将会被缓存到哪个队列中。
其实生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息时应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义。
(3) 有一些可用的exchange类型:direct, topic, headers和fanout
(4) 总结:
a) 在生产者和消费者的信道中声明exchange名字以及类型
b) 在生产者的信道中指定发送目标的exchange
c) 在消费者端的信道中声明一个随机的消息队列,并拿到这个队列名称;然后在信道上绑定该消息队列和消息路由
Routing(路由)
Direct类型的exchange
多重绑定(Multiple bindings)
Topics
(1) 消息发送到topic类型的exchange上时不能随意指定routing_key(一定是指由一系列由点号连接单词的字符串,单词可以是任意的,但一般都会与消息或多或少的有些关联)。Routing key的长度不能超过255个字节。
(2) Binding key也一定要是同样的方式。Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。然而这种绑定有两种特殊的情况:
a) *(星号):可以替代一个单词(一串连续的字母串)
b) #(井号):可以匹配一个或个字符
(3) Topic类型的exchange:
a) Topic类型的exchange是很强大的,也可以实现其它类型的exchange。
b) 当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。
c) 当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。