RabbitMQ erlang "topics"

原文链接:http://www.rabbitmq.com/tutorials/tutorial-five-python.html

在前面的例子中我们改进了我们的日志系统。使用 fanout 类型的exchage 只能广播消息。我们使用 direct 来代替,获得了选择性接收消息的可能。

虽然使用direct类型的exchange改善了我们的系统,但它仍然有缺陷,它不能基于多种条件 进行routing。

在我们的日志系统中,我们可能想要订阅日志不仅基于严重性程度,而且基于发布日志的源码。你可能知道 syslog unix tool 中的概念, 那个基于 严格的(info/warning/crit...)和灵巧的(auth/cron/kern)。
    这将给我们许多灵活性——我们可能想听仅来自于 ‘corn‘日志中的 严重的错误和kern 里面的所有日志。

在我们的日志系统为了实现那个,我们需要学习更复杂的 topic exchange.



 Topic exchange

发送给topic exchange的消息不能随便定义一个routing key——routing key 必须是一个由”.”号分隔的单词列表。单词可以是任意的,但是一般情况下它应该代表消息的一些特征。比如下面一些routing key :

“stock.usd.nyse”  “nyse.vmw”  “quick.orange.rabbit”

Routing key 的最大长度限制为255字节。

队列的binding key 也必须是同样的格式。Topic exchange的逻辑跟direct exchange差不多——带有指定routing key的消息将会被发送到有相同binding key 的消息队列中。

Binding key 有两特例:

  • *(star) 代表一个单词
  • #(hash) 代表0个或者多个单词

如下面的例子

在这个例子中我们将发送描述动物的消息。消息的routing key 由三个单词组成,第一个单词代表动物的速度,第二个单词代表动物的颜色,第三个单词代表动物的种类。”<celerity>.<colour>.<species>”

我们创建三个绑定:Q1使用binding key “*.orange.*”, Q2使用binding key “*.*.rabbit” 和”lazy.#”。 也就是说Q1对颜色为orange的动物感兴趣,而Q2对所有的rabbit和速度为lazy的动物感兴趣。

routing key 为‘qucik.orange.rabbit‘的消息将会被发送Q1和Q2队列, "lazy.orange.elephant"消息也会发送到Q1和Q2。 “quick.orange.fox”仅仅去Q1,“lazy.brown.fox”仅去Q2.  "lazy.pink.rabbit"将会被发送给Q2一次,虽然它匹配两个bindings. “quick.brown.fox”不匹配任意binding ,所以它被丢弃。

如果我们打破规则,发送一个单词或者四个单词的消息将会发生什么,如: “orange”或"quick.orange.male.rabbit"?好吧,这个消息将不匹配任何一个bindings,将会被丢弃。

另一种情况,“lazy.orange.male.rabbit”, 虽然他也是四个单词,但是它将会匹配Q2 binding 将会被发送到Q2。因为匹配“lazy.#”

tipic exchange 是强大的,你能表现的像其他的exchanges.

当一个队列的 binding key 是"#"(hash) ——它将收到所有的消息,无视 routing key :向 fanout exchange.

当 bindings里没有 使用"*"和"#"特殊字符时, topic echange 和 direct exchange 行为将会一样。



Putting it all together

We‘re going to use a topic exchange in our logging system. We‘ll start off with a working assumption that the routing keys of logs will have two words: "<facility>.<severity>".

emit_log_topic.erl

-module(emit_log_topic).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"topic_logs">>,
                                                   type = <<"topic">>}),

    {RoutingKey, Message} = case Argv of
                                [] ->
                                    {<<"anonymous.info">>, <<"Hello World!">>};
                                [R] ->
                                    {list_to_binary(R), <<"Hello World!">>};
                                [R | Msg] ->
                                    {list_to_binary(R), list_to_binary(string:join(Msg, " "))}
                            end,
    amqp_channel:cast(Channel,
                      #‘basic.publish‘{
                        exchange = <<"topic_logs">>,
                        routing_key = RoutingKey},
                      #amqp_msg{payload = Message}),
    io:format(" [x] Sent ~p:~p~n", [RoutingKey, Message]),
    ok = amqp_channel:close(Channel),
    ok = amqp_connection:close(Connection),
    ok.

receiver_logs_topic.erl

-module(receive_logs_topic).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"topic_logs">>,
                                                   type = <<"topic">>}),

    #‘queue.declare_ok‘{queue = Queue} =
        amqp_channel:call(Channel, #‘queue.declare‘{exclusive = true}),

    [amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"topic_logs">>,
                                              routing_key = list_to_binary(BindingKey),
                                              queue = Queue})
     || BindingKey <- Argv],

    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),

    amqp_channel:subscribe(Channel, #‘basic.consume‘{queue = Queue,
                                                     no_ack = true}, self()),
    receive
        #‘basic.consume_ok‘{} -> ok
    end,
    loop(Channel).

loop(Channel) ->
    receive
        {#‘basic.deliver‘{routing_key = RoutingKey}, #amqp_msg{payload = Body}} ->
            io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
            loop(Channel)
    end.

接收所有的日志

接收所有来自 facility是 "kern"的日志

接收“critical”日志:

创建多个bindings:

发送一个 “kern.critical”类型 routing key 的日志:

时间: 2024-10-02 10:55:40

RabbitMQ erlang "topics"的相关文章

RabbitMQ之Topics(多规则路由)

Exchange中基于direct类型无法基于多种规则进行路由. 例如分析syslog日志,不仅需要基于severity(info/warning/critical/error)进行路由,还需要基于auth.cron或者kernal模式进行路由. Topic exchange可以满足这种需求. Topic exchange 基于topic类型交换器的routing key不是唯一的,而是一系列词,基于点区分. 例如:"stock.usd.nyse", "nyse.vmw&qu

RabbitMQ&gt;Erlang machine stopped instantly (distribution name conflict?). The service is not restarted as OnFail is set to ignore.-报错解决方案 原来是NNND。。。

>Erlang machine stopped instantly (distribution name conflict?). The service is not restarted as OnFail is set to ignore. -报错解决方案 原来是NNND... RabbitMQ安装的盘符的名字是中文字符 切记:RabbitMQ安装路径不能出现中文字符

【译】RabbitMQ:Topics

在前面的教程中,我们对日志系统进行了功能强化.我们使用direct类型的交换器并且为之提供了可以选择接收日志的能力,替换了只能傻乎乎的广播消息的fanout类型的交换器.尽管使用direct类型的交换器强化了系统,但是它依然有一些限制,不能基于条件的进行路由. 在日志系统中,我们或许希望不仅能根据严重等级,还能基于日志的发送源来订阅日志日志.你可能已经从Unix的syslog工具中知道了这个概念,该工具路由日志的时候既基于严重等级(info/warn/crit...)又基于设备(auth/cro

RabbitMQ erlang "Routing"

官方网址:http://www.rabbitmq.com/tutorials/tutorial-four-python.html 前面的例子中,我们构建了一个简单的日志系统.我们可以广播日志消息给所有的接收者. 在这个例子中,我们准备增加一个新特性.我们将能仅仅订阅消息的一部分. 例如:我们直接仅仅把 critical error 类型的消息写入日志文件(保存到磁盘空间),然而还能够打印所有的日志消息到控制台. Bindings(绑定) 在前面的例子中,我们已经创建bindings. 如下: a

海龙-redhat6.x64位系列 yum极速安装最新版rabbitmq及erlang

redhat6.x64位系列 yum极速安装最新版rabbitmq及erlang 简易操作步骤: [注意]:如果网速比较差,可以先源码安装好erlang环境后,设置好系统变量后再使用rpm直接安装rabbitmq即可.网络可以还是建议直接全部yum安装 安装RabbitMQ # rpm -i --nodeps rabbitmq-server-3.6.6-1.e16.noarch.rpm 直接用#rpm -i rabbitmq-server-3.6.6-1.noarch.rpm,会提示缺少erla

centos 7 下 rabbitmq 3.8.0 &amp; erlang 22.1 源码编译安装

centos 7 下 rabbitmq 3.8.0 & erlang 22.1 源码编译安装 安装前请检查好erlang和rabbitmq版本是否相匹配参考:RabbitMQ Erlang Version Requirements 1. erlang 22.1 源码编译安装 1.1 安装Erlang编译安装必要依赖 sudo yum install -y gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-

RabbitMQ入门与使用篇

介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官方 概念: Brocker:消息队列服务器实体. Exchange:消息交换机,指定消息按什么规则,路由到哪个队列. Queue:消息队列,每个消息都会被投入到一个或者多个队列里. Binding:绑定,它的作用是把exchange和queue按

Kafka与RabbitMQ、ActiveMQ协议区别

对于Kafka与RabbitMQ.ActiveMQ协议,它们具体的区别如下: activemq:        activemq支持主从复制.集群.但是集群功能看起来很弱,只有failover功能,即我连一个失败了,可以切换到其他的broker上.这一点貌似不太科学.假设有三个broker,其中一个上面没有consumer,但另外两个挂了,消息会转到这个上面来,堆积起来.看样子activemq还在升级中.        activemq工作模型比较简单.只有两种模式 queue,topics r

rabbitmq集群搭建(centos6.5)

一:rabbitmq的安装: 参考:http://www.blogjava.net/hellxoul/archive/2014/06/25/415135.html http://blog.haohtml.com/archives/15249 说明:修改机器名字后再安装(为后面集群做准备) vi /etc/sysconfig/network 修改名字 vi /etc/hosts 修改地址映射表,如192.168.1.112   rabbitmq-node1.com rabbitmq-node1 #