RabbitMQ 入门【精+转】

rabbitmq可以用一本书取讲,这里只是介绍一些使用过程中,常用到的基本的知识点。
官方文档覆盖的内容,非常全面:http://www.rabbitmq.com/documentation.html 。

1. 介绍

RabbitMQ,即消息队列系统,它是一款开源消息队列中间件,采用Erlang语言开发,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的标准实现。

AMQP是一个公开发布的异步消息的规范,是提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

https://www.rabbitmq.com/tutorials/amqp-concepts.html

相对于JMS(Java Message Service)规范来说,JMS使用的是特定语言的APIs,而消息格式可自由定义,而AMQP对消息的格式和传输是有要求的,但实现不会受操作系统、开发语言以及平台等的限制。

JMS和AMQP还有一个较大的区别:JMS有队列(Queues)和主题(Topics)两种消息传递模型,发送到 JMS队列 的消息最多只能被一个Client消费,发送到 JMS主题 的消息可能会被多个Clients消费;AMQP只有队列(Queues),队列的消息只能被单个接受者消费,发送者并不直接把消息发送到队列中,而是发送到Exchange中,该Exchage会与一个或多个队列绑定,能够实现与JMS队列和主题同样的功能。

另外还有一种 MQTT协议,意为消息队列遥测传输,是IBM开发的一个即时通讯协议。由于其维护一个长连接以轻量级低消耗著称,所以常用于移动端消息推送服务开发。MQTT是基于TCP的应用层协议封装,实现了异步Pub/Sub,在物联网(IoT)应用广泛。

RabbitMQ可通过库、插件的形式,支持JMS和MQTT协议。参考:http://geek.csdn.net/news/detail/71894

1.1 主要概念

  1. Broker
    接收和分发消息的应用,RabbitMQ Server就是Message Broker
  2. Exchange
    message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct, topic, fanout。
    如果没有队列绑定到exchange上,那么该exchange上的消息都会被丢弃,因为它不存储消息又不知道该怎么处理消息。
  3. Queue
    消息队列载体,每个消息都会被投入到一个或多个队列
  4. Binding
    在exchange和queue之间建立关系就叫Binding,消费者声明队列的时候一般会指定routing_key,也可以叫binding_key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
  5. Routing Key
    这里区分一下binding和routing: binding是一个将exchange和queue关联起来的动作,routing_key可以理解成队列的一个属性,表示这个队列接受符合该routing_key的消息,routing_key需要在发送消息的时候指定。
  6. Vhost
    于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
  7. Producer
    消息生产者,就是投递消息的程序。只负责把消息发送exchange,附带一些消息属性。
  8. Consumer
    消息消费者,就是接受消息的程序。
  9. Channel
    如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。
    Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

1.2 对比

rabbitmq
activemq
rocketmq
kafka
zeromq
redis

celery
待续

2. 安装配置

CentOS 6.7,安装3.6.14最新稳定版本:

1234
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpmrpm -Uvh erlang-solutions-1.0-1.noarch.rpmrpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.ascyum install -y socat

如果机器上有epel源,先把它禁用掉:enabled=0,否则会默认从这个源按照低版本rabbitmq 。
如果已安装老版本,可能需要卸载 rpm -qa|grep erlang|awk ‘{print "yum remove -y "$1}‘|sh 。
继续

1234
wget http://packages.erlang-solutions.com/rpm/centos/6/x86_64/erlang-20.1-1.el6.x86_64.rpmwget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.14/rabbitmq-server-3.6.14-1.el6.noarch.rpm

yum localinstall -y erlang-20.1-1.el6.x86_64.rpm rabbitmq-server-3.6.14-1.el6.noarch.rpm

确保本地主机名能够正常解析出自己的ip,或 127.0.0.1. (ping rabbitmq-01)

12345678910111213
ulimit -S -n 4096ulimit -n 65534

# limits.confcat /etc/security/limits.conf* soft nofile 65535* hard nofile 65535

# 从配置文件模板创建配置文件sudo cp -a /usr/share/doc/rabbitmq-server-3.6.14/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

# 启动/etc/init.d/rabbitmq-server restart

默认用户名密码 guest/guest, 具有vhost / 的所有权限,只能在本地访问。
队列元数据及内容信息,默认在目录 /var/lib/rabbitmq/mnesia 下。

2.1 配置

12345678910111213141516
# 启用管理插件rabbitmq-plugins enable rabbitmq_management

# /etc/rabbitmq/rabbitmq.config 配置[ {rabbit,  [%%   {tcp_listeners, [5672]},   {vm_memory_high_watermark, 0.6},   %% {vm_memory_high_watermark_paging_ratio, 0.5},   {hipe_compile, true}  ]}, {rabbitmq_management,  [%% Preload schema definitions from a previously exported definitions file. See  ]}].

%%是Erlang的注释符号。

  • vm_memory_high_watermark
    RabbitMQ在使用当前机器的40%以上内存时候,会发出内存警告,并阻止RabbitMQ所有连接(producer连接)。这个阈值便由 vm_memory_high_watermark 控制
  • vm_memory_high_watermark_paging_ratio
    当内存中的数据达到一定数量后,他需要被page out出来。比如默认这个ratio=0.5,机器内存8G,于是 memory watermark=0.4 8G几即 3.2G。3.2G paging_raio = 1.6G,当消息挤压的量达到1.6G后,开始paging到磁盘上。
    一搬不去改它。
  • hipe_compile
    开启Erlang HiPE编译选项(相当于Erlang的jit技术),能够提高性能20%-50%。在Erlang R17后HiPE已经相当稳定,RabbitMQ官方也建议开启此选项。
    开启之后,每次启动 rabbitmq-server,需要多花1分钟左右。

看下 rabbitmqctl status 信息,混个眼熟:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
Status of node ‘[email protected]‘[{pid,6232}, {running_applications,     [{rabbitmq_management,"RabbitMQ Management Console","3.6.14"},      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.14"},      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.14"},      {cowboy,"Small, fast, modular HTTP server.","1.0.4"},      {rabbitmq_consistent_hash_exchange,"Consistent Hash Exchange Type",          "3.6.14"},      {rabbitmq_sharding,"RabbitMQ Sharding Plugin","3.6.14"},      {rabbit,"RabbitMQ","3.6.14"},      {amqp_client,"RabbitMQ AMQP Client","3.6.14"},      {rabbit_common,          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",          "3.6.14"},      {os_mon,"CPO  CXC 138 46","2.4.3"},      {mnesia,"MNESIA  CXC 138 12","4.15.1"},      {cowlib,"Support library for manipulating Web protocols.","1.0.2"},      {compiler,"ERTS  CXC 138 10","7.1.2"},      {recon,"Diagnostic tools for production use","2.3.2"},      {syntax_tools,"Syntax tools","2.1.3"},      {crypto,"CRYPTO","4.1"},      {stdlib,"ERTS  CXC 138 10","3.4.2"},      {kernel,"ERTS  CXC 138 10","5.4"}]}, {os,{unix,linux}}, {erlang_version,     "Erlang/OTP 20 [erts-9.1] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe] [kernel-poll:true]\n"}, {memory,     [{connection_readers,0},      {connection_writers,0},      {connection_channels,0},      {connection_other,8864},      {queue_procs,48686248},      {queue_slave_procs,0},      {plugins,14194848},      {other_proc,12618480},      {metrics,323944},      {mgmt_db,12627800},      {mnesia,701856},      {binary,22261264},      {msg_index,634656},      {allocated_unused,364165712},      {reserved_unallocated,0},      {total,596238336}]}, {alarms,[]}, {listeners,     [{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"0.0.0.0"}]}, {vm_memory_calculation_strategy,rss}, {vm_memory_high_watermark,0.6}, {vm_memory_limit,4952820940}, {disk_free_limit,50000000}, {disk_free,1626125135872}, {file_descriptors,     [{total_limit,65435},      {total_used,58},      {sockets_limit,58889},      {sockets_used,0}]}, {processes,[{limit,1048576},{used,446}]}, {run_queue,0}, {uptime,1232025}, {kernel,{net_ticktime,60}}]

2.2 命令行

123456789101112131415
# 添加新的 vhostrabbitmqctl add_vhost /some0rabbitmqctl list_vhost

# 添加登录用户 admin  rabbitmqctl add_user admin adminrabbitmqctl list_users

# 设置为管理员角色rabbitmqctl set_user_tags admin administrator

# 设置权限rabbitmqctl set_permissions -p /some0 admin ‘.*‘ ‘.*‘ ‘.*‘rabbitmqctl list_permissions -p /some0rabbitmqctl list_user_permissions admin

在开始介绍概念之前,先可以从Web UI上来认识一下rabbitmq:
rabbitmq overview 首页监控面板:

rabbitmq 客户端的连接信息:

某个channel的详情:

exchanges信息:

queues信息:

策略定义:

3. Exchange类型

AMQP 0-9-1 定义了四种内置类型的exchange type: direct, fanout, topic, header。exchange除了类型以外,还可以指定一些属性:

  • Name: 交换器名字。一般以 . 号分隔以作区分
  • Durability: 持久化的exchange在broker重启之后依然存在。相对应是 transient exchange
  • Auto-delete: 如果设置了该属性,在最后一个队列unbound之后,exchange会自动删除
  • Arguments: 可以用在满足插件扩展上
    • alternate-exchange
      RabbitMQ自己扩展的功能,不是AMQP协议定义的。
      Alternate Exchange属性的作用,创建Exchange指定该 x-arguments 的alternate-exchange属性,发送消息的时候根据route key没有找到可以投递的队列,这就会将此消息路由到 Alternate Exchange 属性指定的 Exchange (就是一个普通的exchange)上了。

      比如把MySQL的binlog订阅出来,因为里面有许多表,每个表的dml行数有多有少。我们可以将变更量多的表单独放到一个队列,其它表一起放到一个队列,就可以为原始的exchange添加 alternate-exchange 属性,将其它表的数据重新投递到另一个exchange。

3.1 fanout

fanout类型的exchange是最容易理解的,它会把来自生产者的消息广播到所有绑定的queues上。这种情况一般会把消息的routing_key设置为空‘‘,甚至不关心队列的名字。如下图:

amq.gen-RQ6...amq.gen-As8...是消费者随机生成了两个队列,绑定到fanout exchange上,C1,C2会各自收到一模一样的消息。

3.2 direct

direct类型的exchange转发消息到队列里,是直接基于消息的routing key。

C1在声明队列的时候,指定routing_key=error。C2的队列上绑定了info,error,warning三个key。
于是error类型的消息会被同时发送到C1,C2(准确的说是两个队列上),而info,warning类型的消息只发送到队列amqp.gen-Agl...

如果要达到Round-Robin轮询效果,即两个Consumer依次从同一个队列里取消息,那么可以在声明队列的时候指定相同的 queue name,rabbitmq会自动均衡的发送消息给多个Consumer,可水平扩展消费者的处理能力(如果要保证处理顺序,得设置prefetch_count=1)。

3.3 topic

topic类型的exchange大大提升了消息路由的灵活性。不像fanout那样无脑的全部转发,也不像direct那样指定所有的routing_key,否则不匹配的key的消息就会被丢弃。
比如有一个收集日志的系统,模块包括auth/cron/kernel/app1/app2,日志级别包括error,info,warning。现在要把所有模块的error日志规整在一起,可以设计routing_key: \<module>.\<severity> (auth.error, auth.info, …, app1.error, app1.info…),然后设置queue的binding_key=’*.error’

topic exchange 会根据 . 划分word,有以下两种正则符号用于匹配routing_key:

  • *: 代表一个word
  • #: 代表0个或多个word

拿官网的例图来说:<敏捷度>.<颜色>.<物种>

上图创建了3个bindings:

  • 队列Q1的binding_key=*.orange.*,即对所有橙色的动物感兴趣
  • 队列Q2绑定了*.*.rabbitlazy.#,即订阅了所有和兔子相关的消息,以及反应迟钝的动物

于是:

  • routing_key为quick.orange.rabbit的消息,会被发送到两个队列
  • routing_key为lazy.orange.elephant的消息,也会被发送到两个队列
  • routing_key为quick.orange.fox的消息,只会发送到Q1
  • routing_key为lazy.brown.fox的消息,只会发送到Q2
  • routing_key为lazy.pink.rabbit的消息,只会发送到Q2。虽然匹配到了lazy.#*.*.rabbit,但只会发送一次
  • routing_key为quick.brown.fox的消息,会被丢弃,因为没有任何绑定的队列得到匹配
  • routing_key为lazy.orange.male.rabbit的消息,还是会发送到Q2,因为 lazy.#
    然而orangequick.orange.male.rabbit,也破坏了约定,但没得到匹配,消息丢弃。
  • routing_key为#,接受所有消息,相当于fanout exchange
  • routing_key没有*#时,相当于direct exchange

3.4 headers

header类型的exchange用的不多,是在routing_key不能满足使用场景的情况下(如routing_key必须是字符串),在消息的头部加入一个或多个key/value,然后在声明队列的时候也指定要绑定的header。

binding的时候有个参数x-match,指定headers所有的k/v都要匹配成功(all)还是任意一个匹配则接受(any)。

3.5 x-consistent-hash

这是个第三方插件形式存在的exchange,目前已内置于rabbitmq:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange

x-consistent-hash类型的exchange可以根据routing_key,用一致性哈希算法,将消息路由到不同的队列上。它可以尽可能的保证每个队列上的消息数量相同,也可以随时添加更多的队列来“分流”,并且能保证同一个routing_key会进入相同的queue。

要达到这样的效果,queue routing key必须是一个字符串类型的数字。比如Q1:routing_key=’10’, Q2:routing_key=’20’,那么消息就会按照1:2的比例,发送到Q1,Q2。

3.6 x-modulus-hash

第三方插件形成存在的exchange,从3.6.0版本开始,也内置到了rabbitmq发行版:https://github.com/rabbitmq/rabbitmq-sharding

x-modulus-hash类型的exchange与 x-consistent-hash 很像,也叫 sharding exchange,即将message在多个队列之间进行分区发送。它的实现方法是根据 routing_key 先获得hash,再用 Hash mod N 得到队列,N就是绑定到exchange上的队列个数。

4. Queue属性

Queue 要先于 Exchange 创建,否则生产者发布的消息,在没有绑定队列之前,会丢失。
已存在的Queue可以重复declare,但前提是属性要相同。

  • Name: 队列名称。可以在应用里面指定,或者交给broker生成
  • Durable:持久化的Queue在broker重启之后,依然存在。
    注意,这里的持久化与消息持久无关。是个 property
  • Exclusive: 为True时,表示当Consumer的Connection端口之后,队列自动删除。一般由broker生成的随机队列名,指定这个选项 。
    排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的
  • Auto-delete: 当最后一个consumer取消订阅之后,队列自动删除
  • Arguments: 设置可选的一些参数,如
    • x-message-ttl
      消息在队里里最大存活时间,超过这个ttl就会被丢弃。单位毫秒
    • x-max_length
      队列里最多容纳的消息个数,超过这个值,则会从队列头部drop掉消息
    • x-max-priority
      设置了这个参数,就表示这是一个具有优先级的队列。它的值是可定义的优先级最大值,一般10以内就够了。
      在生产商Publish消息的时候,消息Property上可设置Priority
    • x-queue-mode
      这个参数是控制是否为”延迟队列”,Lazy Queue是在3.6.0引入的,它会尽量把消息存在磁盘上,节省内存
      RabbitMQ一开始的设计初衷,是做异步、解耦,所以会把消息放在内存里面,以便快速的发送给消费者(持久化类型的消息会同时存在于磁盘和内存缓存中)。

      如果用它来暂时存放大量消息,而不消费或者消费太慢,会导致性能明显下降,因为为了释放内存,消息得swap到磁盘上 —— 会阻塞队列接收新消息。如果内存使用达到broker设置的 water-mark,也会拒绝接收新消息。
      Lazy Queue(x-queue-mode=lazy)的作用就是一接收到新消息,马上存到文件系统,完全避免了前面提到的内存占用。这会增加磁盘I/O(顺序的),与处理持久化类型的消息很相似。

    • x-dead-letter-exchange
      死信。当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:
      • 消息被拒绝(basic.reject or basic.nack)并且requeue=false
      • 消息TTL过期
      • 队列达到最大长度

      DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列。
      死信被重新 requeue 时,可以改变它的routing_key,以便新的队列处理,routing_key用x-dead-letter-routing-key指定,如果不指定则继续使用消息原来的routing_key。

5. Message属性

  • routing_key
    路由关键字,exchange根据这个关键字进行消息投递
  • delivery_mode
    • 1: Non-persistent,消息不持久化到磁盘,尽快被消费掉。重启broker之后消息丢失
    • 2: Persistent,消息持久化。当然被取走的消息,也就不存在了
  • headers
    消息头信息,key/value形式,可以认为给消息打上了各种各样的标签。可用于代替 routing_key 去路由(结合headers来下的exchange),或者第三方插件使用。
  • properties
    实际上 headers 和 delivery_mode 也是properties的一部分,因为使用较多,所以单独拿出去。这里也只提几个:
    • priority
      消息优先级。数字,优先级高的消息会排在队列头部
    • correlation_id 和 reply_to
      这两个一般用于实现服务间RPC调用, 即生产者发起请求到rabbitmq队列,等待处理结果返回,消费者处理完消息后返回结果给调用方。
      reply_to 在消息里面告诉消费者,处理完的结果放到哪个队列,调用方根据 correlation_id 找到结果。详情参考 https://www.rabbitmq.com/tutorials/tutorial-six-python.html
    • expiration
      消息自身的Time-To-Live,用的较少,也叫 Per-Message TTL In Publisher.
      前面提到,队列的arguemnts可以设置 x-message-ttl ,也叫 Per-Queue Message TTL In Queues.消息是否过期以两者的最小值为准,并且消息自身过期时间到了之后,不会自动从队列删除,而是在发送给消费者的时候丢弃。
      队列自身也有个 x-expires,它指的是队列在多久没有消费者连上来,超过这个时间后队列自动删除。
  • payload: 消息正文

6. 插件

RabbitMQ支持插件式的来扩展功能。

1234567891011121314151617181920212223242526272829303132333435363738
列举server上安装的所有插件# rabbitmq-plugins list Configured: E = explicitly enabled; e = implicitly enabled | Status:   * = running on [email protected] |/[e*] amqp_client                       3.6.14[e*] cowboy                            1.0.4[e*] cowlib                            1.0.2[  ] rabbitmq_amqp1_0                  3.6.14[  ] rabbitmq_auth_backend_ldap        3.6.14[  ] rabbitmq_auth_mechanism_ssl       3.6.14[E*] rabbitmq_consistent_hash_exchange 3.6.14[  ] rabbitmq_event_exchange           3.6.14[  ] rabbitmq_federation               3.6.14[  ] rabbitmq_federation_management    3.6.14[  ] rabbitmq_jms_topic_exchange       3.6.14[E*] rabbitmq_management               3.6.14[e*] rabbitmq_management_agent         3.6.14[  ] rabbitmq_management_visualiser    3.6.14[  ] rabbitmq_mqtt                     3.6.14[  ] rabbitmq_random_exchange          3.6.14[  ] rabbitmq_recent_history_exchange  3.6.14[E*] rabbitmq_sharding                 3.6.14[  ] rabbitmq_shovel                   3.6.14[  ] rabbitmq_shovel_management        3.6.14[  ] rabbitmq_stomp                    3.6.14[  ] rabbitmq_top                      3.6.14[  ] rabbitmq_tracing                  3.6.14[  ] rabbitmq_trust_store              3.6.14[e*] rabbitmq_web_dispatch             3.6.14[  ] rabbitmq_web_mqtt                 3.6.14[  ] rabbitmq_web_mqtt_examples        3.6.14[  ] rabbitmq_web_stomp                3.6.14[  ] rabbitmq_web_stomp_examples       3.6.14[  ] sockjs                            0.3.4

启用插件# rabbitmq-plugins enable plugin-name

下面是几个常用插件:

  1. rabbitmq_management
    管理 rabbitmq server 的插件,提供给予HTTP的API和 WebUI,提供管理exchanges、管理queues、管理users、管理policies,监控,发布/接收消息。功能强大,基本是必定开启的插件。
    开启管理插件后,也可以选择不使用Web界面,从 http://localhost:15672/cli/rabbitmqadmin 下载 rabbitmqadmin 命令行工具,它用在一些脚本里面会很方便。(提示: rabbitmqctl 是不能创建exchange和queue,但rabbitmqadmin可以)
  2. rabbitmq_federation
    与MySQL Federated 存储引擎很相似,可以认为 federated exchange 是其它exchange(也叫upstream exchange)的“软连接”、“流量复制”。消息是被publish到上游exchange,然后消费者是从其它broker上的federated exchange订阅消息。
    Federated exchanges/queues 是通过 AMQP 协议的Erlang客户端从真实broker里面取数据(不会消费源数据),可以实现跨网络的消息提取,或者将不同地方的消息汇总到一处。应用场景有 broker / cluster 数据迁移,模仿真实数据的线下测试。
  3. rabbitmq_shovel
    shovel插件就是一个 消费者 + 生产者:从一个queue消费内容,发送到另一个exchange上,甚至可以对消息做些转换。你可以自己实现将消息从源broker消费,重新publish到另一个exchange,但shovel帮我们做好了。
  4. rabbitmq_mqtt
    实现了 MQTT 3.1 协议的adapter,如文章开头所述。
  5. rabbitmq_consistent_hash_exchange
    一致性hash exchange,如前文所述。

6. 策略 Policy

首先为什么rabbitmq会有策略这个东西。

前面我们讲到,queue和exchange有一些固定属性,如durableexclusiveauto-delete等,还有一些可选参数,也叫x-arguments,如x-max-lengthx-queue-mode。这些都是客户端在定义队列和交换器时指定的。

如果事后想修改 TTL 或者 queue length limit ,那么得修改应用、重新部署,甚至涉及到删除队列,重新declare。Policy就是解决这个痛点的,在服务端对匹配的 exchanges 或者 queues 设置参数,无需动应用。更多请参考 https://www.rabbitmq.com/parameters.html

一个 policy 包含以下内容:

  • name: 策略名字
  • pattern: 对哪些queues(exchanges)的应用策略,正则表达式
  • definition: 策略内容定义,key/value形式(也可以认为是JSON格式)
  • apply-to: 策略应用在什么身上,queuesexchangesall。默认是all
  • priority: 策略优先级,默认0

每个exchange/queue只能“注入”一个policy,所以如果要设置多个策略,把key/value组合成json,定义在一起。设置完成会马上生效,包括后面新创建的exchange、queues。

12345678
将exchange设置为 alternate exchange:(策略名:AE)rabbitmqctl set_policy -p /some0 AE "^maxwell.some3$" ‘{"alternate-exchange":"maxwell.AE"}‘ --apply-to exchanges

将vhost /some0 的所有队列都设置成 Lazy Queuerabbitmqctl set_policy -p /some0 Lazy "^" ‘{"queue-mode":"lazy"}‘ --apply-to queues

队列名匹配 ‘two-messages‘ 的队列,设置最大队列消息数为2,超过之后的行为是 禁止接收新消息(与之对应的是 drop-head: 删除头部老的消息)rabbitmqctl set_policy my-pol "^two-messages$" ‘{"max-length":2,"overflow":"reject-publish"}‘ --apply-to queues

7. 消息可靠性

有的系统要保证消息不允许丢失,甚至不允许重复,有的系统追求的是高性能,所以要在性能和可靠性之间权衡。rabbitmq在多个层面提供消息可靠性保证。

7.1 持久化

声明持久化的exchange: channel.exchange_delcare(exchange_name, durable)
声明持久化的队列:channel.queueDeclare(queue_name, durable, exclusive, auto_delete, arguments)
发布的持久化消息,投递模式为2: delivery_mode=2

http://www.rabbitmq.com/reliability.html
persistent

7.1 ack & confirm

持久化保证了在broker或者机器出现异常的时候,消息不会丢失,要保证发送者在pub消息、接收sub消息时出现网络异常,客户端也应该有相应的处理。

Consumer Delivery Acknowledgements

rabbitmq对Consumer处理消息提供 acknowledgements 确认机制,客户端通过basic.consume注册到broker(push),或者通过basic.get pull 消息,都可以在指定是否开启ack

delivery tags是实现 ack 的关键,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag,它是单调递增的正整数,在一个channel中唯一代表了一次投递。

确认模式包括自动确认和手动确认。
自动确认就是rabbitmq一旦把消息发送出去后,就认为成功,完成确认。此模式性能最高,只要消费者能处理的过来,但自然降低了消息到达处理的可靠性,比如一个消息还在路上,消费者的TCP连接或者channel就关闭了,那么消息也就丢失。如果消费者处理不过来,可能会导致消息在客户端挤压,内存过载,引发异常。所以自动确认一般用在消息比较平稳、客户端能处理的来的系统。

手动确认,就是客户端需要自己发送确认命令,包括:

  • basic.ack —— 确认成功,客户端成功处理
  • basic.nack —— 确认失败,客户端处理失败,但依然删掉消息
  • basic.reject —— 确认失败,客户端处理失败,消息不删除,可重新发送。

手动确认模式,可以控制消息处理的速度(流控QoS),通过 prefetch 设置该channel上最大没有确认的消息数,server会等待有空闲的配额时才继续发送给消费者。
手动确认模式如果不设置 prefetch_count,那么消费者可能会接收许多的消息但未ack,从而导致内存耗尽,所以这点需要小心。正常来说,100-300是个比较可控的范围。(当然如果是 pull 模式,就不存在QoS一说)

basic.ackbasic.nack可以设置 multiple 字段,批量确认来减少网络传输。比如说在信道 ch 上有 delivery tags 5, 6, 7, 8 没有确认,当客户端发回的确认帧是8并且 multiple=true,那么5-8的tags都被ack。

在启用手动确认时,发生网络连接断开或者消费者崩溃,而无法返回 ack/nack 命令时,(检测方法是 heartbead)rabbitmq会自动将没有确认的消息 requeue,所以客户端处理消息时,最好能满足幂等性,即能够重复处理这些消息。

Publisher Confirms

rabbitmq对Producer发布消息提供 confirm 机制:客户端可以发送一个 confirm.select 命令将channel设置成confirm工作模式。
所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(basic.ack),这就使得生产者知道消息已经正确到达目的队列了。如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。

如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息,确保消息不会再发送之前就丢失。

然后对于需要持久化的消息的确认,不能完全保证数据被刷到磁盘上,因为每个消息调用 fsync 的带来的IO代价太高,rabbitmq会每隔几百毫秒,批量将消息从文件系统缓存 fsync 刷到磁盘。(了解MySQL的话对这个应该不陌生)

7.2 事务

RabbitMQ 实现了AMQP 0-9-1协议里的事务,这样说唯一能确保消息不丢失的方式,信道可以设置成 transaction 模式:发布消息,commit/rollback消息。

但是事务在这里太重了,而且会极大的降低性能。不用。

7.3 rabbitmq分布式

待聊

5. python使用示例

https://pika.readthedocs.io/en/0.10.0/intro.html

下面的示例是使用Maxwell或者MySQL binlog增量流,json数据进入rabbitmq,然后通过 pika —— python版本的rabbitmq client,重新组装成sql,达到数据增量同步的效果。

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
def binlog_sync(self):     logger.info("connect to rabbitmq server [%s], vhost=%s", rabbitmq_conn_info.get(‘host‘), rabbitmq_conn_info.get(‘vhost‘, ‘/‘))     ## rabbitmq 用户认证信息     credentials = pika.PlainCredentials(rabbitmq_conn_info.get(‘user‘, ‘guest‘),                                         rabbitmq_conn_info.get(‘password‘, ‘guest‘)     )     ## rabbitmq tcp连接     connection = pika.BlockingConnection(         pika.ConnectionParameters(             host=rabbitmq_conn_info.get(‘host‘),             port=rabbitmq_conn_info.get(‘port‘, 5672),             virtual_host=rabbitmq_conn_info.get(‘vhost‘, ‘/‘),             credentials=credentials         )     )     ## rabbitmq 信道,避免频繁tcp断连     channel = connection.channel()

     # exchange_name = ‘maxwell.some‘ + str(self.corpmod)     # exchange_other = ‘maxwell.AE‘     logger.info("declare mq exchange [%s], type=[%s]", self.exchange_name, self.exchange_type)     ## 创建 exchange,如果已经存在相同名字,就不会重复创建,但要求属性要相同     ## 指定exchange_type,durable, arguments 。这里的alternate-exchange放到策略里从创建,因为目前maxwell作为消费者,没有支持arguemnts参数     channel.exchange_declare(exchange=self.exchange_name,                              exchange_type=self.exchange_type,                              durable=True,                              # arguments={‘alternate-exchange‘: exchange_other}     )

     """     channel.exchange_declare(exchange=exchange_other, exchange_type=‘topic‘, durable=True)  # alternative exchange     channel.queue_declare(queue=‘ae_other‘, durable=True)     channel.queue_bind(exchange=exchange_other,                        queue=‘ae_other‘,                        routing_key=‘d_ec_some.*‘)     """     logger.info("declare queue name=[%s]", self.queue_name)     ## 创建 queue,如果以经存在相同名字的队列,则不会创建,但要求属性相同,否则报错     ## 指定了 lazy queue     channel.queue_declare(queue=self.queue_name, durable=True, arguments={‘x-queue-mode‘: ‘lazy‘})

     ## 将routing_key 绑定到队列上     for key in self.queue_bind_key:         logger.info("bind routing_key [%s] to queue [%s]", key, self.queue_name)         channel.queue_bind(exchange=self.exchange_name,                            queue=self.queue_name,                            routing_key=key)

     # consume callback, internal     ## 客户端处理消息     def callback(ch, method, properties, body):         # print(" [x] Received %s" % body)         logger.debug("Received message: %s", body)         try:             data_row = json.loads(body.decode(‘utf-8‘))             self.process_data(data_row)

             if ret == -2:  # requeue             ## 处理异常,如Ctrl+C断开,重新排队                 logger.warning("message data: %s (requeue)", data_row)                 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)                 # return         except ValueError as e:             logger.error("proces Error: %s(skip)", e)             logger.error("  received data: %s", body)             ## 处理异常,但跳过             ch.basic_ack(delivery_tag=method.delivery_tag)         except Exception as e:             logger.error("proces Error: %s(skip)", e)             logger.error("  message data: %s", data_row)             ch.basic_ack(delivery_tag=method.delivery_tag)         else:         ## 发送确认成功             ch.basic_ack(delivery_tag=method.delivery_tag)

     ## 设置最多 50 个未确认     channel.basic_qos(prefetch_count=50)

     # 开始消费,拿到的消息调用callback处理     channel.basic_consume(callback, queue=self.queue_name, no_ack=False)

     # print(‘ [*] Waiting for messages. To exit press CTRL+C‘)     logger.info("start comsuming")

参考



原文连接地址:http://seanlook.com/2018/01/06/rabbitmq-introduce/

原文地址:https://www.cnblogs.com/sunsky303/p/8977138.html

时间: 2024-10-05 04:56:06

RabbitMQ 入门【精+转】的相关文章

2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置 公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务.那么到后期的话,登录服务器之后,全是 一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端 该案例的接收端,源自网上的代码片段

RabbitMQ入门与使用篇

介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官方 概念: Brocker:消息队列服务器实体. Exchange:消息交换机,指定消息按什么规则,路由到哪个队列. Queue:消息队列,每个消息都会被投入到一个或者多个队列里. Binding:绑定,它的作用是把exchange和queue按

Unity Shader入门精要读书笔记(一)序章

本系列的博文是笔者读<Unity Shader入门精要>的读书笔记,这本书的章节框架是: 第一章:着手准备. 第二章:GPU流水线. 第三章:Shader基本语法. 第四章:Shader数学基础. 第五章:利用简单的顶点/片元着色器来实现辅助技巧. 第六章:基本光照模型. 第七章:法线纹理.遮罩纹理等基础纹理. 第八章:透明度测试和透明度混合. 第九章:复杂光照实现. 第十章:高级纹理(立方体纹理等). 第十一章:纹理动画.顶点动画. 第十二章:屏幕特效. 第十三章:深度纹理. 第十四章:非真

RabbitMQ入门(二) —— direct交换器

在RabbitMQ入门(一)里我们讲到exchange有三种最主要的类型:direct.fanout和topic. 这里我们先来看看最简单的direct交换器的使用. 下面是测试代码: package com.jaeger.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.junit.Test; import com.rabbitmq.clie

Unity Shader入门精要学习笔记 - 第4章 学习 Shader 所需的数学基础

摘录自 冯乐乐的<Unity Shader入门精要> 笛卡尔坐标系 1)二维笛卡尔坐标系 在游戏制作中,我们使用的数学绝大部分都是计算位置.距离.角度等变量.而这些计算大部分都是在笛卡尔坐标系下进行的. 一个二维的笛卡尔坐标系包含了两个部分的信息: 一个特殊的位置,即原点,它是整个坐标系的中心. 两条过原点的互相垂直的矢量,即X轴和Y轴.这些坐标轴也被称为是该坐标的矢量. OpenGL 和 DirectX 使用了不同的二维笛卡尔坐标系.如下图所示: 2)三维笛卡尔坐标系 在三维笛卡尔坐标系中,

Unity Shader入门精要学习笔记 - 第6章 开始 Unity 中的基础光照

转自冯乐乐的<Unity Shader入门精要> 通常来讲,我们要模拟真实的光照环境来生成一张图像,需要考虑3种物理现象. 首先,光线从光源中被发射出来. 然后,光线和场景中的一些物体相交:一些光线被物体吸收了,而另一些光线被散射到其他方向. 最后,摄像机吸收了一些光,产生了一张图像. 在光学中,我们使用辐照度来量化光.对于平行光来说,它的辐照度可通过计算在垂直于l的单位面积上单位时间内穿过的能量来得到.在计算光照模型时,我们需要知道一个物体表面的辐照度,而物体表面往往是和l不垂直的,我们可以

RabbitMQ入门:主题路由器(Topic Exchange)

上一篇博文中,我们使用direct exchange 代替了fanout exchange,这次我们来看下topic exchange. 一.Topic Exchange介绍 topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准. direct路由器类似于sql语句中的精确查询:topic 路由器有点类似于sql语句中的模糊查询. 还记得吗?我

RabbitMQ入门教程(十一):消息属性Properties

原文:RabbitMQ入门教程(十一):消息属性Properties 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78698364 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 -

RabbitMQ入门教程(四):工作队列(Work Queues)

原文:RabbitMQ入门教程(四):工作队列(Work Queues) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78596426 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 工作队列 使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们

RabbitMQ入门教程(六):路由选择Routing

原文:RabbitMQ入门教程(六):路由选择Routing 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78629168 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示使用直连接类型,将多个路由键绑定到同一个队列上.也可以将同一个键绑定到多个队列上(多重绑定multiple bind