RabbitMQ 安装与使用

RabbitMQ 安装与使用

前言

吃多了拉就是队列,吃饱了吐就是栈

  • 使用场景

    • 对操作的实时性要求不高,而需要执行的任务极为耗时;(发送短信,邮件提醒,更新文章阅读计数,记录用户操作日志)
    • 存在异构系统间的整合;

安装

  • 下载 Erlang

    • 安装完确定ERLANG_HOME环境变量是否添加,否则:Setx ERLANG_HOME “D:\Program Files\erl8.2″
  • 下载安装包
    • 安装完通过rabbitmqctl status确定rabbitmq状态
  • 管理服务
    • 默认安装成功会自动启动服务
    • 通过开始菜单可以启动,停止,卸载服务
  • 占用端口
    • 4369(集群、Erlang)
    • 5671,5672(应用层标准高级消息队列协议)
    • 25672(Erlang分发,CLI通信)
    • 15672(如果管理插件启用)
    • 61613,61614(如果消息文本协议STOMP已启用)
    • 1883,8883(如果erl实时通信已启用)
  • 支持的平台
    • 基于Ubuntu和Debian的Linux发行版
    • 基于Fedora,CentOS和RPM的Linux发行版
    • Mac OS X
    • Windows XP及更高版本

概念

  • Connections:客户端连接,创建该资源非常耗时,应尽量避免多次创建。
  • Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Broker:简单来说就是消息队列服务器实体。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接收消息的程序。

消息队列的发送过程大概如下:

  1. 客户端创建Connection,连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个Exchange,并设置相关属性。
  3. 客户端声明一个Queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  5. 客户端发送消息首先到exchange
  6. exchange根据type路由到对应的队列(可以是多个队列)中.

Exchange Type

  • direct(直连)

    • routing key 与 binding key相同
  • fanout
    • 给所有绑定队列发送消息
  • topic
    • routing key:audit.irs.corporate => binding key:audit.#
    • routing key:audit.irs => binding key:audit.*
  • default
    • direct
    • binding key为queue名称

常用命令

  • 管理插件

    • rabbitmq-plugins enable rabbitmq_management // 启用
    • rabbitmq-plugins disable rabbitmq_management // 禁用
  • 管理队列
    • rabbitmqctl list_queues // 查看队列
  • 管理用户及权限
    • rabbitmqctl list_users // 查看所有用户
    • rabbitmqctl add_user user_admin passwd_admin // 添加用户
    • rabbitmqctl set_user_tags user_admin administrator // 添加权限
    • rabbitmqctl delete_user guest // 删除用户
    • rabbitmqctl change_password {username} {newpassowrd} // 修改密码
  • 管理虚拟主机vhost
    • rabbitmqctl add_vhost vhostpath // 创建虚拟主机
    • rabbitmqctl delete_vhost vhostpath // 删除虚拟主机
    • rabbitmqctl list_vhosts // 列出所有虚拟主机

使用

  • 发送消息(以持久化代码为例)
var factory = new ConnectionFactory
{
    HostName = hostName,                // rabbit server
    UserName = "admin",
    Password = "admin",
    Port = 5672,                        // Broker端口
    VirtualHost = "/"                   // 虚拟Host,需提前配置
};

using (var connection = factory.CreateConnection()) // 创建与RabbitMQ服务器的连接
{
    using (var channel = connection.CreateModel())  // 创建1个Channel(大部分API在该Channel中)
    {
        // 定义1个队列,自动会和默认的exchange 做direct类型绑定
        channel.QueueDeclare(
            queue: "hello",                     // 队列名称
            durable: true,                      // 队列是否持久化
            exclusive: false,                   // 排他队列:如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。(活动在一次连接内)
            autoDelete: false,                  // 自动删除:当最后一个消费者取消订阅时,队列自动删除。如果您需要仅由一个使用者使用的临时队列,请将自动删除与排除。当消费者断??开连接时,队列将被删除。(至少消费者能连一次)
            arguments: null);                   // 配置参数

        var randomQueue = channel.QueueDeclare();                   // 定义随机的队列 该队列为临时队列(排他队列 + 自动删除)

        // 定义Exchange(一般而言,不需要定义exchange,rabbitmq默认创建了所有类型的exchange)
        //channel.ExchangeDeclare("direct-demo", ExchangeType.Direct);    // 定义direct exchange
        //channel.ExchangeDeclare("fannout-demo", ExchangeType.Fanout);   // 定义fanout exchange
        //channel.ExchangeDeclare("topic-demo", ExchangeType.Topic);      // 定义fanout exchange

        // 定义queue exchange key 关系(在某些业务场景下,会使用该关系做路由功能)
        //channel.QueueBind(queue: "hello", exchange: "amq.direct", routingKey: "hello"); // 默认绑定的关系和该行代码效果一样
        //channel.QueueBind("hello", "amq.fanout", "hello");                              // 该类型下的routingKey 实际不需要

        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        while (true)
        {
            string message = "Hello World!" + DateTime.Now;
            var body = Encoding.UTF8.GetBytes(message);

            // 发送消息到队列中
            channel.BasicPublish(
                exchange: string.Empty,         // 传递为Empty的时候,通过   `(AMQP default)`传递
                routingKey: "hello",            // routing key 与 queuebind中的binding key对应
                basicProperties: properties,    // 消息header
                body: body);                    // 消息body:发送的是bytes 可以任意编码

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}
  • 接收消息(以消息响应为例)

    var factory = new ConnectionFactory
    {
    HostName = hostName,                // rabbit server
    UserName = "admin",
    Password = "admin",
    Port = 5672,                        // Broker端口
    VirtualHost = "/"                   // 虚拟Host,需提前配置
    };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
        var consumer = new EventingBasicConsumer(channel);  // 创建Consumer
        consumer.Received += (model, ea) =>         // 通过回调函数异步推送我们的消息
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Thread.Sleep(1000);
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // 消息响应
            Console.WriteLine(" [x] Received {0}", message);
        };
    
        channel.BasicQos(0, 1, false);  // 设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息
        channel.BasicConsume(queue: "hello",
                                noAck: false,      // 需要消息响应(Acknowledgments)机制
                                consumer: consumer);
    
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
    }
  • 消息响应(acknowledgments)
    • 为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。
  • 持久化
    • 新队列(无法修改队列)配置为可持久化
    • 发送消息配置为持久化
    • 消息什么时候刷到磁盘?
      • 写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
      • 固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
      • 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

常见问题

  • RabbitMQ 管理插件启动报错

    • 确认RabbitMQ服务是否启动
    • C:\Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:\Users{用户名},这是Erlang的Cookie文件,允许与Erlang进行交互
    • 重新安装erl 和 rabbit,尽量不要带空格的路径
  • 修改配置文件
时间: 2024-10-13 18:37:06

RabbitMQ 安装与使用的相关文章

server 2008r2 rabbitmq 安装web管理

在server 20008 r2 安装完之后打开 localhost:15672 显示无法找到网页 因为默认web管理没有启用 需要启用下,启用过程 1:打开rabbbitrq命令 (开始菜单rabbitmq文件夹下) 2:依次输入 (1)rabbitmq-plugins enable rabbitmq_management (2)rabbitmq-service stop (3)rabbitmq-service install (4)rabbitmq-service start 然后在打开就可

centos 7下rabbitmq安装

安装erlang环境 添加rabbitmq依赖的erlang yum命令repos # In /etc/yum.repos.d/rabbitmq-erlang.repo [rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7 gpgcheck=1 gpgkey=https://www.rabbitmq.com/rabbitmq-release-signing

RabbitMQ安装和配置

RabbitMQ: MQ:message queue.MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ

rabbitmq安装Management Plugin

运行和安装Rabbitmq Management的步骤如下: 1.进入Rabbitmq安装目录,运行rabbitmq-plugins enable rabbitmq_management 2.运行rabbitmqctl stop 3.打开浏览器访问:http://localhost:15672/#/

RabbitMQ安装配置

安装RabbitMQ windows下的安装是非常简单的,我们需要准备两个东西 erlang的环境  下载windows和与之对象的操作系统位数安装包 http://www.erlang.org/downloads RabbitMQ http://www.rabbitmq.com/download.html 下载完毕之后,先安装 erlang再安装 RabbitMQ 安装这后我们可以配置一下rabbitmq的环境变量 打开命令行运行rabbitmq-server 现在大功已经告成了,不过rabb

MQ(1)-RabbitMq安装

本人安装的rabbitmq环境相关文件,可在本人提供的百度云盘资源进行下载. 链接:https://pan.baidu.com/s/1bnofK3l 密码:whdm 一 前言 消息队列又称为MQ,应用程序间的消息通信工具,其有利于程序解耦.多语言集成.异步通信.扩展和简单负载均衡等,是生产-消费者模型的典型代表.常见MQ产品有RabbitMQ ZeroMQ Kafka等等. RabbitMQ,老牌MQ产品,基于erlang语言,实现对AMQP等协议的支持,重量级,适合企业级应用开发: Kafka

ActiveMQ与RabbitMQ安装以及实现

ActiveMQ 1.下载ActiveMQ 去官方网站下载:http://activemq.apache.org/ 2.运行ActiveMQ 解压缩apache-activemq-5.11.1-bin.zip,然后双击apache-activemq-5.11.1\bin\activemq.bat运行ActiveMQ程序. 启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue. 3.创建Eclipse项目并运行 创

RabbitMQ安装笔记

RabbitMQ安装笔记 安装Erlang 在安装RabbitMQ之前,需要先安装Erlang.可以通过以下命令安装: yum install erlang 安装时信息如下: 安装RabbitMQ 服务端 安装命令如下: wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.2/rabbitmq-server-3.4.2-1.noarch.rpm rpm -Uvh rabbitmq_server-3.4.2-1.noarch.rpm

AMQP之RabbitMQ安装与配置

刚开始接触RabbitMQ,今天尝试安装,具体流程如下,参照了一些网上同行的经验,环境如下图: rabbitmq版本:3.1.5 下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5.tar.gz 文件下载目录:/home/gao/server以下简称为当前目录 准备工作:安装依赖环境 yum install build-essential openssl openssl-devel