11.RabbitMQ单机集群

RabbitMQ集群设计用于完成两个目标:允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行,以及通过添加更多的节点来扩展消息通信的吞吐量。

RabbitMQ会始终记录以下四种类型的内部元数据:

1.         队列元数据-队列的名称和它们的属性(是否持久化,是否自动删除)

2.         交换器元数据-交换器类型、名称和属性(可持久化等)

3.         绑定元数据-一张简单的表格展示了如何将消息路由到队列

4.         vhost元数据-为vhost内的队列、交换器和绑定提供命名空间和安全属性

在单一节点内,RabbitMQ会将所有这些信息存储在内存中,同时将那些标记为可持久化的队列和交换器(以及它们的绑定)存储到硬盘上。当你引入集群时,RabbitMQ需要追踪新的元数据类型:集群节点位置,以及节点与已记录的其他类型元数据的关系。集群提供了选择:将元数据存储到磁盘上,或者存储在内存中。

Erlang Cookie

Erlang Cookie是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的Erlang Cookie。具体的目录存放在/var/lib/rabbitmq/.erlang.cookie。

说明: 这就要从rabbitmqctl命令的工作原理说起,RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证。

镜像队列

功能和原理

RabbitMQ的Cluster集群模式一般分为两种,普通模式和镜像模式。

  • 普通模式:默认的集群模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
  • 镜像模式:将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。

内存节点和磁盘节点

每个RabbitMQ节点,要么是内存节点(ram
node),要么是磁盘节点(disk
node)。内存节点将所有的队列、交换器、绑定、用户、权限和vhost的元数据定义都仅存在内存中。而磁盘节点则将元数据存储在磁盘中。

内存节点的效率更高,内存节点唯一存储到磁盘上的是磁盘节点的地址。

RabbitMQ要求集群中至少有一个磁盘节点。当节点加入或者离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,而且不凑巧的是它又崩溃了,那么集群可以继续路由消息,但是不能做以下操作了:

1.        
创建队列

2.

创建交换器

3.

创建绑定

4.

添加用户

5.

更改权限

单机环境搭建多节点群集

1、禁用管理后台插件rabbitmq-plugins disable
rabbitmq_management

2、创建三个Shell文件

rabbitmq1.sh

#!/bin/bash

export
RABBITMQ_NODE_PORT=5672

export
RABBITMQ_NODENAME=rabbit

rabbitmq-server

rabbitmq2.sh

#!/bin/bash

export
RABBITMQ_NODE_PORT=5673

export
RABBITMQ_NODENAME=rabbit2

rabbitmq-server

rabbitmq3.sh

#!/bin/bash

export
RABBITMQ_NODE_PORT=5674

export
RABBITMQ_NODENAME=rabbit3

rabbitmq-server

3、停止在Erlang节点上运行的节点2和节点3
RabbitMQ Server 并清空(重置)它们的元数据

rabbitmqctl -n [email protected]
stop_app

rabbitmqctl -n [email protected]
stop_app

rabbitmqctl -n [email protected]
reset

rabbitmqctl -n [email protected]
reset

4、将节点2作为磁盘节点加入集群并启动应用

rabbitmqctl -n [email protected]
join_cluster [email protected]

rabbitmqctl -n [email protected]
start_app

5、将节点3作为内存节点加入集群并启动应用

rabbitmqctl -n [email protected]
join_cluster --ram [email protected]

rabbitmqctl -n [email protected]
start_app

6、运行命令rabbitmqctl cluster_status查看集群状态

Cluster status of node
[email protected] ...

[{nodes,[{disc,[[email protected],[email protected]]},

{ram,[[email protected]]}]},

{running_nodes,[[email protected],[email protected],[email protected]]},

{cluster_name,<<"[email protected]">>},

{partitions,[]},

{alarms,[{[email protected],[]},

{[email protected],[]},

{[email protected],[]}]}]

集群安装成功,这时候java客户端可以连接任何一个RabbitMQ
Server的端口来访问集群了。

7、镜像队列

在声明队列时,可以通过参数"x-ha-policy"设置为"all"来把消息发送到集群的所有节点上。

Map
arg =
new
HashMap();

arg.put("x-ha-policy",

"all");

channel.queueDeclare(queueName,

false
,
false
,
false
,
arg);

客户端发送代码

package
com.test.cluster;

import
com.rabbitmq.client.*;

import
java.io.IOException;

import
java.lang.String;

import
java.lang.System;

import
java.util.HashMap;

import
java.util.Map;

import
java.util.Scanner;

public class
Producer {

public static void main(String[] args) throws
Exception {

//使用默认端口连接MQ

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("admin");

factory.setHost("192.168.169.142"); //使用默认端口5672

Connection
conn = factory.newConnection(); //声明一个连接

Channel
channel = conn.createChannel(); //声明消息通道

String
exchangeName = "TestEXG";//交换机名称

String
routingKey = "RouteKey1";//RoutingKey关键字

channel.exchangeDeclare(exchangeName, "direct",
true);//定义声明交换机

String
queueName = "ClusterQueue";//队列名称

Map arg =
new HashMap();

arg.put("x-ha-policy", "all");

channel.queueDeclare(queueName, false, false, false,
arg);

channel.queueBind(queueName, exchangeName,
routingKey);//定义声明对象

byte[]
messageBodyBytes = "Hello, world!".getBytes();//消息内容

channel.basicPublish(exchangeName, routingKey, null,
messageBodyBytes);//发布消息

//关闭通道和连接

channel.close();

conn.close();

}

}

消费者代码

package
com.test.cluster;

import
com.rabbitmq.client.Channel;

import
com.rabbitmq.client.Connection;

import
com.rabbitmq.client.ConnectionFactory;

import
com.rabbitmq.client.QueueingConsumer;

import
java.io.IOException;

import
java.util.HashMap;

import
java.util.Map;

//通过channel.basicAck向服务器发送回执,删除服务上的消息

public class
Consumer {

public static void main(String[] args) throws
IOException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("admin");

factory.setHost("192.168.169.142"); //使用默认端口5672

factory.setPort(5672);

Connection
conn = factory.newConnection(); //声明一个连接

Channel
channel = conn.createChannel(); //声明消息通道

String
exchangeName = "TestEXG";//交换机名称

String
queueName = "ClusterQueue";//队列名称

channel.exchangeDeclare(exchangeName, "direct",
true);//定义声明交换机

channel.queueBind(queueName, exchangeName, "RouteKey1");

channel.basicQos(1); //server push消息时的队列长度

//用来缓存服务器推送过来的消息

QueueingConsumer consumer = new
QueueingConsumer(channel);

channel.basicConsume(queueName, false, consumer);

while
(true) {

QueueingConsumer.Delivery
delivery = consumer.nextDelivery();

System.out.println("Received
" + new String(delivery.getBody()));

//回复ack包,如果不回复,消息不会在服务器删除

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);

}

}

}

原文地址:https://www.cnblogs.com/zzpblogs/p/8168821.html

时间: 2024-08-30 16:26:33

11.RabbitMQ单机集群的相关文章

RabbitMQ入门教程(十四):RabbitMQ单机集群搭建

原文:RabbitMQ入门教程(十四):RabbitMQ单机集群搭建 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78723467 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 集群简介 理解集群先理解一下元数据 队列元数据:队列的名称和声明队列时设置的属性(是否持久化.是否自动删除.队列所属的节点)

Rabbitmq 相关介绍之单机集群配置

一.说明: 说到集群,大家应该都不陌生,为了提高性能需要配置集群,而在有的时候,我们需要在测试环境先测试然后灰度上线,所以这里介绍在一台服务器上配置rabbitmq集群 二.rabbitmq集群模式 1.普通模式:rabbitmq默认的集群模式 RabbitMQ集群中节点包括内存节点.磁盘节点.内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘上.如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘.那么内存节点的性能只能体现在资源管理上,比如增加或删除队列(qu

阿里云构建Kafka单机集群环境

简介 在一台ECS阿里云服务器上构建Kafa单个集群环境需要如下的几个步骤: 服务器环境 JDK的安装 ZooKeeper的安装 Kafka的安装 1. 服务器环境 CPU: 1核 内存: 2048 MB (I/O优化) 1Mbps 操作系统 ubuntu14.04 64位 感觉服务器性能还是很好的,当然不是给阿里打广告,汗. 随便向kafka里面发了点数据,性能图如下所示:  2. 安装JDK 想要跑Java程序,就必须安装JDK.JDK版本,本人用的是JDK1.7. 基本操作如下: 从JDK

RabbitMQ分布式集群架构

RabbitMQ分布式集群架构和高可用性(HA) https://blog.csdn.net/woogeyu/article/details/51119101 (一) 功能和原理 设计集群的目的 允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行 通过增加更多的节点来扩展消息通信的吞吐量 1 集群配置方式 RabbitMQ可以通过三种方法来部署分布式集群系统,分别是:cluster,federation,shovel cluster: 不支持跨网段,用于同一个网段内的局域网 可以随意的

SpringBoot集成redisson(单机,集群,哨兵)

1.springBoot集成redisson(单机,集群,哨兵) redisson版本使用3.8.2 <dependency>??????<groupId>org.redisson</groupId>??????<artifactId>redisson</artifactId>??????<version>3.8.2</version></dependency> 2.配置文件 application.prope

Redis基本概念、基本使用与单机集群部署

1. Redis基础 1.1 Redis概述 Redis是一个开源.先进的key-value存储,并用于构建高性能.可扩展的应用程序的完美解决方案. Redis从它的许多竞争继承了三个主要特点:   ①Redis数据库完全在内存中,使用磁盘仅用于持久性:   ②相比许多键值对数据存储,Redis拥有一套较为丰富的数据类型:   ③Redis可以将数据复制到任意数据量的从服务器: 1.2 Redis优势 异常快速:Redis的速度非常快,每秒能执行约11万次set操作,每秒约81000次get操作

ZooKeeper环境搭建(单机/集群)(转)

前提: 配置文件主要是在$ZOOKEEPER_HOME/conf/zoo.cfg,刚解压时为zoo_sample.cfg,重命名zoo.cfg即可. 配置文件常用项参考:http://www.cnblogs.com/EasonJim/p/7483880.html 环境搭建: 一.ZooKeeper的搭建方式 ZooKeeper安装方式有三种,单机模式和集群模式以及伪集群模式. 单机模式:ZooKeeper只运行在一台服务器上,适合测试环境: 伪集群模式:就是在一台物理机上运行多个ZooKeepe

一张图看懂单机/集群/热备/磁盘阵列(RAID)

单机部署(Standalone) 只有一个饮水机提供服务器,服务只部署一份 集群部署(Cluster) 多个饮水机同时提供服务,服务冗余部署,每个冗余的服务都对外提供服务,一个服务挂掉时依然可用 热备部署(Hot-swap) 只有一个桶提供服务,另一个桶stand-by,在水用完时自动热替换,服务冗余部署.只有一个主服务对外提供服务,影子服务在主服务挂掉时顶上 磁盘阵列RAID(Redundant Arrays of indepent Disks) RAID 0: 存储性能高的磁盘阵列,原理:将

Windows &amp; RabbitMQ:集群(clustering) &amp; 高可用(HA)

描述:我们需要配置三台服务器:ServerA, ServerB, ServerC 注意事项: 所有的服务器的Erlang版本,RabbitMQ版本必须一样 服务器名大小写敏感 Step 1:安装RabbitMQ 1. ServerA:Windows & RabbitMQ:安装,Step 1, 2, 3, 4, 5 2.ServerB, ServerC:Windows & RabbitMQ:安装,Step 1, 2 Step 2:修改ServerA, ServerB, ServerC的Hos