RabbitMQ 消息中间件的部署

一、RabbitMQ安装

安装步骤:

1、  安装Erlang

2、  安装RabbitMQ server

3、  配置环境

4、  开启服务

安装Erlang:

安装最新版本Erlang

配置环境变量:ERLANG_HOME=D:\RabbitMQ\erl5.9.2,Erlang的安装目录。

安装RabbitMQ服务器:

下载rabbitmq-server-windows-2.8.7.zip,解压到D:\RabbitMQ\rabbitmq_server-2.8.7。

sbin目录下存放RabbitMQ server的控制命令,RabbitMQ server可以以服务和应用程序两种方式运行。

rabbitmq-server.bat 是应用程序方式运行。

rabbitmq-service.bat 是服务方式运行。

rabbitmqctl.bat 管理插件,管理日志、输出、通过命令访问等。

配置系统路径:RABBITMQ_SERVER = D:\RabbitMQ\rabbitmq_server-2.8.7。

在path中加入“;% RABBITMQ_SERVER%\sbin”。

RabbitMQ日志等数据在“C:\Documents and Settings\gzcheng\Application Data\RabbitMQ”目录下。

运行RabbitMQ:

在Dos窗口中输入rabbitmq-server.bat运行RabbitMQ。

关闭RabbitMQ应用程序,可以直接关闭RabbitMQ,也可以通过rabbitmqctl进行命令管理。在另外一个DOS窗口运行:rabbitmqctl stop。用rabbitmqctl status,可以查看RabbitMQ状态。

RabbitMQ默认端口是:5672。

二、编译RabbitMQ-C

1、安装python,要使用2.*版python,如:python-2.7.3.msi。

2、安装cmake,cmake-2.8.10.1-win32-x86.zip。

3、下载rabbitmq-c-master.zip,解压rabbitmq-c-master.zip。

4、下载rabbitmq-codegen-master.zip,解压rabbitmq-codegen-master.zip,将解压内容拷贝到rabbitmq-c-master 中的 codegen 目录下。

5、运行cmake-gui.exe,设置source code路径设置rabbitmq-c-master目录,输出设置自定义的目录。

6、单击配置,选择编译器类型,VS2005或者VS2008。

三、RabbitMQ-C的API说明

1、amqp_connection_state_t amqp_new_connection(void)

接口说明:声明一个新的amqp connection

2、int amqp_open_socket(char const *hostname, int portnumber)

接口说明:获取socket

参数说明:hostname RabbitMQ server所在主机;portnumber RabbitMQ server监听端口

3、void amqp_set_sockfd(amqp_connection_state_t state,int sockfd)

接口说明:将amqp connection和sockfd进行绑定

4、amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,int channel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method, ...)

接口说明:用于登录RabbitMQ server,主要目的为了进行权限管理;

参数说明:state    amqp connection

vhost   rabbit-mq的虚机主机,是rabbit-mq进行权限管理的最小单位

channel_max  最大链接数,此处设成0即可

frame_max  和客户端通信时所允许的最大的frame size.默认值为131072,增大这个值有助于提高吞吐,降低这个值有利于降低时延

heartbeat 含义未知,默认值填0

sasl_method  用于SSL鉴权,默认值参考后文demo

5、amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel)

接口说明:用于关联conn和channel

6、amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments)

接口说明:声明declare

参数说明:state

channel

exchange

type  "fanout"  "direct" "topic"三选一

passive

curable

arguments

7、amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments)

接口说明:声明queue

参数说明:state   amqp connection

channel

queue  queue name

passive

durable  队列是否持久化

exclusive  当前连接不在时,队列是否自动删除

aoto_delete 没有consumer时,队列是否自动删除

arguments 用于拓展参数,比如x-ha-policy用于mirrored queue

8、amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments)

接口说明:声明binding

9、amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global)

接口说明:qos是 quality of service,我们这里使用主要用于控制预取消息数,避免消息按条数均匀分配,需要和no_ack配合使用

参数说明:state

channel

prefetch_size 以bytes为单位,0为unlimited

prefetch_count 预取的消息条数

global

10、amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments)

接口说明:开始一个queue consumer

参数说明:state

channel

queue

consumer_tag

no_local

no_ack    是否需要确认消息后再从队列中删除消息

exclusive

arguments

11、int amqp_basic_publish(amqp_connection_state_t state,amqp_channel_t channel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_t mandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const *properties,amqp_bytes_t body)

接口说明:发布消息

参数说明:state

channel

exchange

routing_key  当exchange为默认“”时,此处填写queue_name,当exchange为direct,此处为binding_key

mandatory

immediate

properties 更多属性,如何设置消息持久化

body 消息体

12、amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,int code)

接口说明:关闭channel

13、amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,int code)

接口说明:关闭connection

14、int amqp_destroy_connection(amqp_connection_state_t state)

接口说明:销毁connection

四、RabbitMQ服务客户端的的业务逻辑

1,打开 socket:

amqp_new_connection();

amqp_open_socket(hostname, port);

2,用户登陆:

amqp_set_sockfd(conn, sockfd);

amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, userName, password);

3,打开channel

amqp_channel_open(conn, 1);

amqp_get_rpc_reply(conn);

4,声明 Exchange

amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);

5,声明Queue

amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue), 0, 0, 0, 1,amqp_empty_table);

6,将Queue和Exchange进行binding

amqp_queue_bind(conn, 1,

queuename,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(bindingkey),

amqp_empty_table);

注意:binding动作在程序中只能执行一次,如果第二次再执行,程序会crash!

7,操作,包括 发送(publish),接收(consume)等。

发送:

amqp_basic_publish(conn,

1,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(routingkey),

0,

0,

NULL,

amqp_cstring_bytes(messagebody));

接收:

amqp_basic_consume(conn, 1,queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

amqp_maybe_release_buffers(conn);

此处应该注意:接受的过程是一个阻塞的异步过程,所以必须在子线程中进行操作,这样就不会影响主线程中的UI操作,所以在以前使用的block编程就用了很大的用武之地,一方面,通过block多核编程提高程序的运行效率,第二方面,异步的dispatch能够完美的解决阻塞的问题,并且可以使处理后返回的数据直接在OC类中直接使用,从而规避了在C函数中传递OC的指针来对OC的对象进行的操作,真是一劳永逸的好方法,推荐大家使用。

8,进行unbinding

amqp_queue_unbind(conn, 1,

queuename,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(bindingkey),

amqp_empty_table);

9,关闭channel

amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);

10,关闭 connection连接。

// Closing connection

amqp_connection_close(conn, AMQP_REPLY_SUCCESS);

//Ending connection

amqp_destroy_connection(conn);

五、RabbitMQ虚拟主机

虚拟主机virtual host只起一个命名空间的作用,多个用户user可以共用一个虚拟主机。系统默认虚拟主机是‘/’。要注意不同命名空间之间的资源是不能访问的,比如exchange、queue、binging等。

可以通过rabbitmqctl创建自己虚拟主机。

1、创建用户:rabbitmqctl add_user test 123456;新建的用户名是test,密码是123456,通过rabbitmqctl list_users查看有多少用户。

2、新建虚拟主机:rabbitmqctl add_vhosts test_host;新建一个test_host虚拟主机,通过rabbitmqctl list_vhosts查看有几个虚拟主机。

3、设置访问权限:rabbitmqctl set_permissions –p test_host test “.*” “.*” “.*”;test用户就有读写test_host虚拟主机的权限。

4、删除用户:rabbitmqctl delete_user guest。

六、RabbitMQ主要概念

1、Exchange(交换机)

有三个类型:“Direct”点对点直接传输,Publisher和Consumer直接连接,不处理路由键;“Fanout”不处理路由键,消息发送到与Exchange绑定的所有队列上;“Topic”主题订阅,Consumer可以使用统配符来定义routingKey,使用相同的topic类型,并且符合该统配符的Publisher发布的信息,Consumer都可以收到。

程序指定类型字符串:“direct”、“topic”、“fanout”、“headers”。要修改系统默认类型,需要用amqp_exchange_declare声明,声明后才能用。

交换机可以是持久的、临时的和自动删除的。持久的就是一直存在于服务器;临时的会工作到RabbitMQ被关闭为止;自动删除是指当没程序使用就自动删除。

2、Queue(消息队列)

消息队列可以是持久的、临时的和自动删除的。消息队列是用来替消费者保存消息,采用先进先出规则。消息队列会跟踪消息的获取情况,消息要出队就必须被获取(acquire和consume是两个动作,先执行acquire,相当于对消息加锁),这阻止多个客户端同时获取和消费同一条消息,也可以被用于做单队列多消费者之间的负载均衡。

3、持久化

Queue、消息和Exchange都要持久。

4、绑定

七、数据发送和接收规则

1、一个Connection只能对应一个接收线程,不能用多线程结果一个Connection。

2、通过一个connection上可以创建多个通道,从不同队列上接收数据。

八、RabbitMQ管理

1、  开启Management Plugin

在DOS窗口中输入:rabbitmq-plugins enable rabbitmq_management

管理页面地址:http://server-name:15672/,早期端口是:55672。

Management Plugin Web地址: http://www.rabbitmq.com/management.html

2、  RabbitMQ配置

一般情况下,RabbitMQ的默认配置就足够了,如果希望特殊设置的话,有两个途径:

一个是环境变量的配置文件rabbitmq-env.conf,只适用非Windows环境,Windows环境需要通过Start > Settings > Control Panel > System > Advanced > Environment Variables来设置环境变量。设置好后,还要重新安装RabbitMQ Service。

一个是配置信息的配置文件rabbitmq.config,这个文件内容必须符合erlang配置文件的标准。它既有默认的目录,也可以通过RABBITMQ_CONFIG_FILE环境变量进行设置。

详细说明见http://www.rabbitmq.com/configure.html

环境变量配置(配置端口,文件路径,名称等)

RABBITMQ_NODE_IP_ADDRESS 默认为空 为空时表示允许所有IP连接,如果你添加约束配置这一个选项

RABBITMQ_NODE_PORT 默认为5672 端口 端口

HOSTNAME 默认为HOST名称 当前机器HOST名称

COMPUTERNAME windows默认:localhost 当前机器名称

RABBITMQ_BASE Windows默认: %APPDATA%\RabbitMQ 安装目录

RABBITMQ_NODENAME Unix*:[email protected]$HOSTNAME Windows: [email protected]%COMPUTERNAME% 节点名称

RABBITMQ_CONFIG_FILE Unix*:/etc/rabbitmq/rabbitmq Windows: %RABBITMQ_BASE%\rabbitmq 配置文件

RABBITMQ_MNESIA_BASE Unix*:/var/lib/rabbitmq/mnesia Windows: %RABBITMQ_BASE%\db mq数据库目录

RABBITMQ_LOG_BASE Unix*:/var/log/rabbitmq Windows: %RABBITMQ_BASE%\log LOG文件目录

RABBITMQ_PLUGINS_DIR 默认为空 插件目录

RABBITMQ_ENABLED_PLUGINS_FILE Unix*:/etc/rabbitmq/enabled_plugins Windows: %RABBITMQ_BASE%\enabled_plugins 记录激活的插件

ERLANG_SERVICE_MANAGER_PATH WindowsService: %ERLANG_HOME%\erts-x.x.x\bin erlang安装目录

RABBITMQ_SERVICENAME Windows Service: RabbitMQ rabbitmq服务名称

RABBITMQ_CONSOLE_LOG Windows Service: 控制台日志

时间: 2024-08-13 08:33:01

RabbitMQ 消息中间件的部署的相关文章

Rabbitmq简介及部署群集

博文大纲:一.MQ简介二.什么是RabbitMQ?三.安装RabbitMQ四.部署Rabbitmq集群1)部署环境2)安装rabbitmq服务3)配置host文件,并将上述两个节点加入集群4)rabbitmq01配置群集并将rabbitmq02.03加入01群集5)访问web界面6)web页面添加vhost五.单台节点加入或退出群集1)单节点加入集群2)单节点退出集群 一.MQ简介 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队

Rabbitmq集群部署手册

一.Rabbitmq集群部署手册 1.环境介绍 系统环境:Red HatEnterprise Linux Server release 6.2 (Santiago) 内核版本:Linux zxt-02.com2.6.32-220.el6.x86_64 #1 SMP Wed Nov 9 08:03:13 EST 2011 x86_64 x86_64 x86_64GNU/Linux 软件版本:otp_src_17.3:rabbitmq-server-3.2.4:Python 2.6.6:simple

Rabbitmq集群部署Haproxy

安装前的说明: 1.为了配合做rabbitmq的负载均衡,本次的安装包是安装的低版本1.5.8版本,下载包的地址:https://src.fedoraproject.org/repo/pkgs/haproxy/2.按照上两篇博客,先把rabbitmq集群装好:单机版Rabbitmq部署:http://blog.51cto.com/10950710/2135676Rabbitmq集群部署:http://blog.51cto.com/10950710/2135717 安装步骤: 1.将haproxy

分布式rabbitmq集群部署

分布式rabbitmq集群部署本次部署使用的三台centos服务器安装的erlang 版本为 21.0.4安装的rabbitmq版本为3.7.8-rc.2在三台服务器中分别安装rabbitmq-serverrabbitmq通信是建立在erlang环境中 所以需要先安装erlang环境 因为国外的erlang官方地址中中国不能直接访问,所以我们使用第三方包https://github.com/rabbitmq/erlang-rpm 安装说明地址 修改系统repo包文件修改repo# In /etc

RabbitMQ的安装部署

RabbitMQ安装部署 一.软件准备 p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; line-height: 19.0px; font: 13.0px "Helvetica Neue"; color: #00a2ff } p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; line-height: 19.0px; font: 13.0px "Helvetica Neue" } span.s1 { col

CentOS6.8搭建rabbitmq消息中间件

参考资料:http://blog.csdn.net/yunfeng482/article/details/72853983 一.rabbitmq简介 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除

python中使用rabbitmq消息中间件

上周一直在研究zeromq,并且也实现了了zeromq在python和ruby之间的通信,但是如果是一个大型的企业级应用,对消息中间件的要求比较高,比如消息的持久化机制以及系统崩溃恢复等等需求,这个时候zeromq就显得有点鸡肋了,特别是消息持久化是他的的硬伤,那么怎么找一个比较适合的中间件呢? 目前市场上主流的中间件除了zeromq,还有rabbitmq,activemq等等,这两周都比较有名,一个是基于erlang,一个是基于jms,rabbitmq是AMQP(高级消息队列协议)的标准实现,

RabbitMQ集群部署

环境: ip z主机名 c操作系统 10.0.0.1 test1 Centos7.2 10.0.0.2 test2 Centos7.2 10.0.0.3 test3 Centos7.2 一.安装依赖环境. yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget 二.防火墙.主机与ip间的映射. 1. 关闭或设置防火墙 systemctl stop firewalld.service syste

RabbitMQ消息中间件介绍

一.基础介绍 随着分布式应用的发展消息队列中间件成为C/S架构中解耦的一个重要环节,传统的消息传输模型中,C端发出消息,S端必须在线,否则将无法继续进行,而在拥有消息中间件的模型下消息产生者(C端)发出的消息由中间件来接受,即使此时消息消费者(S端)即便不在线也有可能不产生中断.RabbitMQ作为消息中间件的一种其组成部分如下图所示: 他的核心组成部分为: 交换器(Exchange):起作用主要是将收到的消息交换至对应的队列 队列(Message):用于存放供订阅者(Consumer)读取消息