Rabbitmq 简单介绍,安装和go客户端使用

Rabbitmq 简单介绍,安装和go客户端使用

1,消息队列介绍

1.1 什么是消息队列?

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
消息队列,一般我们会简称他为MQ(Message Queue),消息队列可以简单的理解为:把要传输的数据放在队列中

说明:

  • Producer:消息生产者,负责产生和发送消息到 Broker;
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

1.2, 为什么要用消息队列?

1.2.1 应用结偶

比如在我们现在公司的业务常见中:
1,给客户打完电话之后我们需要根据通话录音进行打标签;
2,给客户打完电话之后我们需要给他发送短信
3,给客户打完电话之后我们需要发送他的通话给机器人,让机器人自学习
简单架构图如下:

如果没有消息队列,在A服务里面要写上3个API接口分别对应后面三个服务,突然有一天这个客户说我不需要发短信功能了,如果按照上面这种方式,我们就需要联系开发开始删代码,然后升级,刚升级好没几天,客户说我有要这个功能,那开发又要升级代码,这个时候开发集体离职了,(这每天干的完全是无用功)
但是如果有消息队列那就完全不一样了,就会变成下面这个样子:

A只需要写一个接口对接MQ了,后面不管是添加和删除都对A没有影响了,删除直接取消去消息就行了,大大减少了开发人员的工作量

1.2.2 异步处理

还拿上面那个场景来简述这个:A是公司的主要业务,打电话业务,BCD为非主要业务。
假设A调用BCD 接口需要50ms,那等A把所有接口调用完成之后需要150ms,对主业务消耗特别大,如果我们不用搭理BCD的处理,A直接把他交给消息队列,有消息队列去处理BCD,A只要把数据给消息队列就行了,那么A的压力就会很小,也不会影响主要业务流程,提高用户的

1.2.3 流量削峰

打个比方,我们目前有A B两个服务,A服务的OPS峰值为100W,但是B服务的OPS峰值只有10w,这个时候来了90w个请求,A服务能处理过来没问题,但是这个时候B服务直接就崩溃了

如果这个时候我们在A和B之间加一个rabbitmq,我们让B每次去取9w,这样B服务就不会挂了,

1.3,消费者怎么得到消息队列的数据?

  • 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
  • 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)

2, RabbitMQ消息队列

2.1 RabbitMQ的优点

2.1.1 可靠性

RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。

2.1.2 灵活的路由

消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。

2.1.3 集群

在相同局域网内的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用

2.1.4 联合

对于服务器来说,他比集群需要更多的松散和非可靠链接,为此RabbitMQ提供了联合模型

2.1.5 高可用的队列

在同一个集群里,队列可以被镜像到多个机器中,以确保当前某些硬件出现故障后,你的消息仍然可以被使用

2.1.6 多协议

RabbitMQ支持多种消息协议的消息传递

2.1.7 广泛的客户端

只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。

2.1.8 可视化管理工具

RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。

2.1.9 追踪

如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。

2.1.10 插件系统

RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。

2.2 RabbitMQ 的概念模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

2.3 RabbitMQ基本概念

RabbitMQ基本流程图:

2.3.1 Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2.3.2 Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

2.3.3 Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

2.3.4 Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

2.3.5 Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

2.3.6 Connection

网络连接,比如一个TCP连接。

2.3.7 Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

2.3.8 Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

2.3.9 Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

2.3.10 Broker

表示消息队列服务器实体。

2.4 安装RabbitMQ

2.4.1 Docker 安装RabbitMQ

注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面。
我们访问docker镜像仓库然后查找RabbitMQ的镜像
找到我们合适的镜像版本

我们使用命令拉去

# docker pull rabbitmq:3.8-rc-management
# docker images
REPOSITORY                                                       TAG                 IMAGE ID            CREATED             SIZE
rabbitmq                                                         3.8-rc-management   90cce17c1af8        2 days ago          179MB

启动RabbitMQ:

docker run -d --hostname my-rabbit --name some-rabbit --net host -e RABBITMQ_DEFAULT_USER=zsf -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8-rc-management

参数解释:

--hostname 指定docker容器内部的名称
--name。    指定docker容器的名称
--net host。容器内部使用主机的网络
-e RABBITMQ_DEFAULT_USER 设置rabbitmq管理界面的用户
-e RABBITMQ_DEFAULT_PASS 设置rabbitmq管理界面用户的密码

查看日志是否启动成功:

# docker logs -f some-rabbit
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] drop_unroutable_metric
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.743 [info] <0.8.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.743 [info] <0.8.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.757 [info] <0.261.0> ra: meta data store initialised. 0 record(s) recovered
2019-09-12 01:25:40.757 [info] <0.266.0> WAL: recovering []
2019-09-12 01:25:40.758 [info] <0.270.0>
 Starting RabbitMQ 3.8.0-rc.1 on Erlang 22.0.7
 Copyright (C) 2007-2019 Pivotal Software, Inc.
 Licensed under the MPL.  See https://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.8.0-rc.1. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2019-09-12 01:25:40.758 [info] <0.270.0>
 node           : [email protected]
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : mhZwmGv/TzyC2kVekZ7Yvg==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/[email protected]
2019-09-12 01:25:40.768 [info] <0.270.0> Running boot step pre_boot defined by app rabbit
2019-09-12 01:25:40.768 [info] <0.270.0> Running boot step rabbit_core_metrics defined by app rabbit
2019-09-12 01:25:40.769 [info] <0.270.0> Running boot step rabbit_alarm defined by app rabbit
2019-09-12 01:25:40.771 [info] <0.276.0> Memory high watermark set to 11998 MiB (12581668454 bytes) of 29997 MiB (31454171136 bytes) total
2019-09-12 01:25:40.774 [info] <0.278.0> Enabling free disk space monitoring
2019-09-12 01:25:40.774 [info] <0.278.0> Disk free limit set to 50MB
2019-09-12 01:25:40.776 [info] <0.270.0> Running boot step code_server_cache defined by app rabbit
2019-09-12 01:25:40.776 [info] <0.270.0> Running boot step file_handle_cache defined by app rabbit
2019-09-12 01:25:40.776 [info] <0.281.0> Limiting to approx 1048476 file handles (943626 sockets)
2019-09-12 01:25:40.776 [info] <0.282.0> FHC read buffering:  OFF
2019-09-12 01:25:40.776 [info] <0.282.0> FHC write buffering: ON
2019-09-12 01:25:40.777 [info] <0.270.0> Running boot step worker_pool defined by app rabbit
2019-09-12 01:25:40.777 [info] <0.271.0> Will use 4 processes for default worker pool
2019-09-12 01:25:40.777 [info] <0.271.0> Starting worker pool ‘worker_pool‘ with 4 processes in it
2019-09-12 01:25:40.777 [info] <0.270.0> Running boot step database defined by app rabbit
2019-09-12 01:25:40.777 [info] <0.270.0> Node database directory at /var/lib/rabbitmq/mnesia/[email protected] is empty. Assuming we need to join an existing cluster or initialise from scratch...
2019-09-12 01:25:40.777 [info] <0.270.0> Configured peer discovery backend: rabbit_peer_discovery_classic_config
2019-09-12 01:25:40.777 [info] <0.270.0> Will try to lock with peer discovery backend rabbit_peer_discovery_classic_config
2019-09-12 01:25:40.777 [info] <0.270.0> Peer discovery backend does not support locking, falling back to randomized delay
2019-09-12 01:25:40.777 [info] <0.270.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping randomized startup delay.
2019-09-12 01:25:40.777 [info] <0.270.0> All discovered existing cluster peers:
2019-09-12 01:25:40.777 [info] <0.270.0> Discovered no peer nodes to cluster with
2019-09-12 01:25:40.779 [info] <0.43.0> Application mnesia exited with reason: stopped
2019-09-12 01:25:40.807 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.825 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.825 [info] <0.270.0> Feature flag `drop_unroutable_metric`: supported, attempt to enable...
2019-09-12 01:25:40.825 [info] <0.270.0> Feature flag `drop_unroutable_metric`: mark as enabled=state_changing
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [~] drop_unroutable_metric
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.841 [info] <0.270.0> Feature flag `drop_unroutable_metric`: mark as enabled=true
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.857 [info] <0.270.0> Feature flag `empty_basic_get_metric`: supported, attempt to enable...
2019-09-12 01:25:40.857 [info] <0.270.0> Feature flag `empty_basic_get_metric`: mark as enabled=state_changing
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [~] empty_basic_get_metric
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.873 [info] <0.270.0> Feature flag `empty_basic_get_metric`: mark as enabled=true
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.890 [info] <0.270.0> Feature flag `implicit_default_bindings`: supported, attempt to enable...
2019-09-12 01:25:40.890 [info] <0.270.0> Feature flag `implicit_default_bindings`: mark as enabled=state_changing
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [~] implicit_default_bindings
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.906 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 0 retries left
2019-09-12 01:25:40.906 [info] <0.270.0> Feature flag `implicit_default_bindings`: mark as enabled=true
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.922 [info] <0.270.0> Feature flag `quorum_queue`: supported, attempt to enable...
2019-09-12 01:25:40.922 [info] <0.270.0> Feature flag `quorum_queue`: mark as enabled=state_changing
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [~] quorum_queue
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.932 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.938 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.939 [info] <0.270.0> Feature flag `quorum_queue`:   migrating Mnesia table rabbit_queue...
2019-09-12 01:25:40.943 [info] <0.270.0> Feature flag `quorum_queue`:   migrating Mnesia table rabbit_durable_queue...
2019-09-12 01:25:40.947 [info] <0.270.0> Feature flag `quorum_queue`:   Mnesia tables migration done
2019-09-12 01:25:40.947 [info] <0.270.0> Feature flag `quorum_queue`: mark as enabled=true
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.963 [info] <0.270.0> Feature flag `virtual_host_metadata`: supported, attempt to enable...
2019-09-12 01:25:40.963 [info] <0.270.0> Feature flag `virtual_host_metadata`: mark as enabled=state_changing
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [~] virtual_host_metadata
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.979 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.983 [info] <0.270.0> Feature flag `virtual_host_metadata`: mark as enabled=true
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] virtual_host_metadata
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.999 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:41.018 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:41.018 [info] <0.270.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping registration.
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step database_sync defined by app rabbit
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step feature_flags defined by app rabbit
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step codec_correctness_check defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step external_infrastructure defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_registry defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_cr_demo defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_queue_location_random defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_event defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_amqplain defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_plain defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_direct defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_fanout defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_headers defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_topic defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_all defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_exactly defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_nodes defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_priority_queue defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Priority queues enabled, real BQ is rabbit_variable_queue
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_queue_location_client_local defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_queue_location_min_masters defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step kernel_ready defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_sysmon_minder defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_epmd_monitor defined by app rabbit
2019-09-12 01:25:41.047 [info] <0.270.0> Running boot step guid_generator defined by app rabbit
2019-09-12 01:25:41.194 [info] <0.270.0> Running boot step rabbit_node_monitor defined by app rabbit
2019-09-12 01:25:41.194 [info] <0.518.0> Starting rabbit_node_monitor
2019-09-12 01:25:41.194 [info] <0.270.0> Running boot step delegate_sup defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step rabbit_memory_monitor defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step core_initialized defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step upgrade_queues defined by app rabbit
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: 1 to apply
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: Applying rabbit_variable_queue:move_messages_to_vhost_store
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: No durable queues found. Skipping message store migration
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: Removing the old message store data
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: All upgrades applied successfully
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_connection_tracking defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_connection_tracking_handler defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_exchange_parameters defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_mirror_queue_misc defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_policies defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_policy defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_queue_location_validator defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_quorum_memory_manager defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_vhost_limit defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_mgmt_reset_handler defined by app rabbitmq_management
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_mgmt_db_handler defined by app rabbitmq_management_agent
2019-09-12 01:25:41.232 [info] <0.270.0> Management plugin: using rates mode ‘basic‘
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step recovery defined by app rabbit
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step load_definitions defined by app rabbitmq_management
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step empty_db_check defined by app rabbit
2019-09-12 01:25:41.233 [info] <0.270.0> Adding vhost ‘/‘ (description: ‘Default virtual host‘)
2019-09-12 01:25:41.238 [info] <0.559.0> Making sure data directory ‘/var/lib/rabbitmq/mnesia/[email protected]/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L‘ for vhost ‘/‘ exists
2019-09-12 01:25:41.241 [info] <0.559.0> Starting message stores for vhost ‘/‘
2019-09-12 01:25:41.241 [info] <0.563.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_transient": using rabbit_msg_store_ets_index to provide index
2019-09-12 01:25:41.241 [info] <0.559.0> Started message store of type transient for vhost ‘/‘
2019-09-12 01:25:41.242 [info] <0.566.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": using rabbit_msg_store_ets_index to provide index
2019-09-12 01:25:41.242 [warning] <0.566.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": rebuilding indices from scratch
2019-09-12 01:25:41.242 [info] <0.559.0> Started message store of type persistent for vhost ‘/‘
2019-09-12 01:25:41.243 [info] <0.270.0> Creating user ‘zsf‘
2019-09-12 01:25:41.244 [info] <0.270.0> Setting user tags for user ‘zsf‘ to [administrator]
2019-09-12 01:25:41.244 [info] <0.270.0> Setting permissions for ‘zsf‘ in ‘/‘ to ‘.*‘, ‘.*‘, ‘.*‘
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step rabbit_looking_glass defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step rabbit_core_metrics_gc defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step background_gc defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step connection_tracking defined by app rabbit
2019-09-12 01:25:41.246 [info] <0.270.0> Setting up a table for connection tracking on this node: ‘[email protected]‘
2019-09-12 01:25:41.247 [info] <0.270.0> Setting up a table for per-vhost connection counting on this node: ‘[email protected]‘
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step routing_ready defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step pre_flight defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step notify_cluster defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step networking defined by app rabbit
2019-09-12 01:25:41.295 [info] <0.612.0> started TCP listener on [::]:5672
2019-09-12 01:25:41.318 [info] <0.270.0> Running boot step cluster_name defined by app rabbit
2019-09-12 01:25:41.318 [info] <0.270.0> Running boot step direct_client defined by app rabbit
2019-09-12 01:25:41.359 [info] <0.662.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2019-09-12 01:25:41.359 [info] <0.768.0> Statistics database started.
2019-09-12 01:25:41.359 [info] <0.767.0> Starting worker pool ‘management_worker_pool‘ with 3 processes in it
 completed with 3 plugins.
2019-09-12 01:25:41.439 [info] <0.8.0> Server startup complete; 3 plugins started.
 * rabbitmq_management
 * rabbitmq_management_agent
 * rabbitmq_web_dispatch

查看端口是否启动完全:

# netstat -ntalp | grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      6423/beam.smp
tcp        0      0 0.0.0.0:15672           0.0.0.0:*               LISTEN      6423/beam.smp
tcp6       0      0 :::5672                 :::*                    LISTEN      6423/beam.smp
//15672 web界面管理接口
//5672  业务端口,客户端操作连接的端口

浏览器访问管理界面

2.5 RabbitMQ使用中的一些概念

在上面的RabbitMQ的基本流程图里面我们可以看到,RabbitMQ的整体工作流程是,生产者产生数据交给RabbitMQ,然后RabbitMQ通过Exchange更具规则来选择绑定到那个队列(Queues)中,然后消费者在到对应的队列里面去取数据,我们下面就来讲解下Exchange的四种类型

2.5.1 Exchange的四种类型

2.5.1.1 direct精准匹配

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

2.5.1.2 fanout 广播

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

2.5.1.3 topic 正则匹配


topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

2.5.1.4 headers 根据头部匹配(几乎不用)

2.6 使用go客户端操作rabbitMQ

2.6.1 生产者代码

package send

import (
    "log"
    "rabbitmq/src/github.com/streadway/amqp" //根据实际情况来定
    "strconv"
    "time"
)

//创建一个返回错误打印日志的函数
func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}
func Send() {
    //打开一个连接
    conn, err := amqp.Dial("amqp://zsf:[email protected]:5672")
    FailOnError(err, "failed to connect to RabbitMQ")
    defer conn.Close()

    //打开一个通道
    ch, err := conn.Channel()
    FailOnError(err, "failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test number",
        false,
        false,
        false,
        false,
        nil,
    )
    FailOnError(err, "Failed to declare a queue")

    for i := 0; i < 60; i++ {
        body := "hello, ZhangShouFu  " + strconv.Itoa(i)
        err = ch.Publish(
            "",
            q.Name,
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        FailOnError(err, "Failed to publish a message")
        time.Sleep(1 * time.Second)
    }

}

2.6.2 消费者代码

package receive

import (
    "log"
    "rabbitmq/src/github.com/streadway/amqp"
    "rabbitmq/src/send"
    "time"
)

//创建一个返回错误打印日志的函数

func Receive() {
    conn, err := amqp.Dial("amqp://zsf:[email protected]:5672/")
    send.FailOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    send.FailOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test number", // name
        false,         // durable
        false,         // delete when usused
        false,         // exclusive
        false,         // no-wait
        nil,           // arguments
    )
    send.FailOnError(err, "Failed to declare a queue")
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    send.FailOnError(err, "Failed to register a consumer")

    forever := make(chan bool)
    for i := 0; i < 60; i++ {
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
            }
        }()
        time.Sleep(5 * time.Second)
    }
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

2.6.3 主函数

package main

import (
    "rabbitmq/src/receive"
    "rabbitmq/src/send"
)

func main() {
    send.Send()
    receive.Receive()
}

运行主函数之后,到rabbitmq管理界面能看到我们刚才创建的队列

参考:
https://www.rabbitmq.com/
https://rabbitmq.mr-ping.com/installation/Installing_on_Debian_Ubuntu.html
https://www.jianshu.com/p/79ca08116d57

原文地址:https://blog.51cto.com/13447608/2437855

时间: 2025-01-02 01:11:32

Rabbitmq 简单介绍,安装和go客户端使用的相关文章

RabbitMQ简单介绍及安装使用

一.RabbitMQ简单介绍 二.安装配置1.安装环境 CentOS7 server1 190.168.3.250安装包依赖[[email protected] ~]# yum -y install gcc gcc-c++ m4 ncurses-devel openssl-devel2.安装RabbitMQ 按顺序安装:3.配置[[email protected] ~]# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.14/ebin/rabbit.a

rabbitmq简单介绍

引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题.消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC).本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一. RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层

RabbitMQ简单介绍+Windows环境安装

文章目录 1.RabbitMQ简介2.RabbitMQ与其他MQ有什么不同3.RabbitMQ环境安装3.1 安装erlang3.2 安装rabbitmq-server4. RabbitMQ管理平台介绍 1.RabbitMQ简介 RabbitMQ 是一个由 erlang 开发的 AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在

运维神器Chef简单介绍和安装笔记

首先大概解释一下Chef Chef有三个重要的概念:(如上图所示) 它们的合作关系大致是这样的, Workstation把资源或者说是一些要被运行的命令上传到Chef-Server上, Nodes自动通过Chef-Server拿到属于自己的执行任务到本地执行,这样可达到一个将军指挥千军万马的效果:smirk:. Chef Server 存放所有通过Workstation上传的资源,和用户等公共数据(用PostgreSQL). 可以干脆叫它为资源服务器,大家都可以与它通讯(用RabbitMQ ),

Linux服务器性能监控工具Glances 安装过程与简单介绍

一.Glances: Glances 是一款非常不错的跨平台的性能监控工具,提供了CPU.CPU队列.内存.虚拟内存.网络.I/O和最占用服务器的资源的进程列表等,应该就这些了吧,提供了这些 指标的监控信息,并且在运行时会根据资源的占用情况适用不同的颜色标注其重要程度,非常直观,下面是使用中的截图: 二.Glances安装:   Glances的安装还是比较简单方便的,由于Glances是由python编写的,最好适用python的库管理工具pip来自动安装.并且使用工具 也将安装过程简单的.

web服务的简单介绍及apache服务的安装

一,web服务的作用:  是指驻留于因特网上某种类型计算机的程序,可以向浏览器等Web客户端提供文档.可以放置网站文件,让全世界浏览:   可以放置数据让全世界下载.目前最主流的三个Web服务器是Apache.Nginx.IIS 二,当前互联网主流Web服务软件: Apache:中小型web服务的主流,web服务使用目前排名第一 Nginx:大型网站web服务的主流,目前使用使用增势迅猛,社区活跃,发布更新版本比较快. Nginx的分支Tengine(淘宝网正在使用),目前也在飞速发展. Lig

TensorFlow简单介绍和在centos上的安装

##tensorflow简单介绍: TensorFlow? is an open source software library for numerical computation using data flow graphs.https://www.tensorflow.org/TensorFlow是谷歌基于DistBelief进行研发的第二代人工智能学习系统,其命名来源于本身的运行原理.Tensor(张量)意味着N维数组,Flow(流)意味着基于数据流图的计算,TensorFlow为张量从图

Android NDK 简单介绍、工具安装、环境配置

NDK全称:Native Development Kit. 1.NDK是一系列工具的集合. * NDK提供了一系列的工具,帮助开发人员高速开发C(或C++)的动态库,并能自己主动将so和java应用一起打包成apk.这些工具对开发人员的帮助是巨大的. * NDK集成了交叉编译器,并提供了对应的mk文件隔离平台.CPU.API等差异,开发者仅仅须要简单改动mk文件(指出"哪些文件须要编译"."编译特性要求"等),就能够创建出so. * NDK能够自己主动地将so和Ja

MongoDB(1)--简单介绍以及安装

前段时间接触了NoSql类型的数据库redis,当时是作为缓存server使用的.那么从这篇博客開始学习还有一个非常出名的NoSql数据库:MongoDb.只是眼下还没有在开发其中使用.一步一步来吧. 简单介绍 MongoDB是一个开源的,基于分布式的,面向文档存储的非关系型数据库. 是非关系型数据库其中功能最丰富.最像关系数据库的. MongoDB由C++编写,其名字来源于"humongous"这个单词,其宗旨在于处理大量数据. MongoDB能够执行在Windows.unix.OS