原文链接:http://www.rabbitmq.com/tutorials/tutorial-five-python.html
在前面的例子中我们改进了我们的日志系统。使用 fanout 类型的exchage 只能广播消息。我们使用 direct 来代替,获得了选择性接收消息的可能。
虽然使用direct类型的exchange改善了我们的系统,但它仍然有缺陷,它不能基于多种条件 进行routing。
在我们的日志系统中,我们可能想要订阅日志不仅基于严重性程度,而且基于发布日志的源码。你可能知道 syslog unix tool 中的概念, 那个基于 严格的(info/warning/crit...)和灵巧的(auth/cron/kern)。
这将给我们许多灵活性——我们可能想听仅来自于 ‘corn‘日志中的 严重的错误和kern 里面的所有日志。
在我们的日志系统为了实现那个,我们需要学习更复杂的 topic exchange.
Topic exchange
发送给topic exchange的消息不能随便定义一个routing key——routing key 必须是一个由”.”号分隔的单词列表。单词可以是任意的,但是一般情况下它应该代表消息的一些特征。比如下面一些routing key :
“stock.usd.nyse” “nyse.vmw” “quick.orange.rabbit”
Routing key 的最大长度限制为255字节。
队列的binding key 也必须是同样的格式。Topic exchange的逻辑跟direct exchange差不多——带有指定routing key的消息将会被发送到有相同binding key 的消息队列中。
Binding key 有两特例:
- *(star) 代表一个单词
- #(hash) 代表0个或者多个单词
如下面的例子
在这个例子中我们将发送描述动物的消息。消息的routing key 由三个单词组成,第一个单词代表动物的速度,第二个单词代表动物的颜色,第三个单词代表动物的种类。”<celerity>.<colour>.<species>”
我们创建三个绑定:Q1使用binding key “*.orange.*”, Q2使用binding key “*.*.rabbit” 和”lazy.#”。 也就是说Q1对颜色为orange的动物感兴趣,而Q2对所有的rabbit和速度为lazy的动物感兴趣。
routing key 为‘qucik.orange.rabbit‘的消息将会被发送Q1和Q2队列, "lazy.orange.elephant"消息也会发送到Q1和Q2。 “quick.orange.fox”仅仅去Q1,“lazy.brown.fox”仅去Q2. "lazy.pink.rabbit"将会被发送给Q2一次,虽然它匹配两个bindings. “quick.brown.fox”不匹配任意binding ,所以它被丢弃。
如果我们打破规则,发送一个单词或者四个单词的消息将会发生什么,如: “orange”或"quick.orange.male.rabbit"?好吧,这个消息将不匹配任何一个bindings,将会被丢弃。
另一种情况,“lazy.orange.male.rabbit”, 虽然他也是四个单词,但是它将会匹配Q2 binding 将会被发送到Q2。因为匹配“lazy.#”
tipic exchange 是强大的,你能表现的像其他的exchanges.
当一个队列的 binding key 是"#"(hash) ——它将收到所有的消息,无视 routing key :向 fanout exchange.
当 bindings里没有 使用"*"和"#"特殊字符时, topic echange 和 direct exchange 行为将会一样。
Putting it all together
We‘re going to use a topic exchange in our logging system. We‘ll start off with a working assumption that the routing keys of logs will have two words: "<facility>.<severity>".
emit_log_topic.erl
-module(emit_log_topic). -compile([export_all]). -include_lib("amqp_client/include/amqp_client.hrl"). main(Argv) -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"topic_logs">>, type = <<"topic">>}), {RoutingKey, Message} = case Argv of [] -> {<<"anonymous.info">>, <<"Hello World!">>}; [R] -> {list_to_binary(R), <<"Hello World!">>}; [R | Msg] -> {list_to_binary(R), list_to_binary(string:join(Msg, " "))} end, amqp_channel:cast(Channel, #‘basic.publish‘{ exchange = <<"topic_logs">>, routing_key = RoutingKey}, #amqp_msg{payload = Message}), io:format(" [x] Sent ~p:~p~n", [RoutingKey, Message]), ok = amqp_channel:close(Channel), ok = amqp_connection:close(Connection), ok.
receiver_logs_topic.erl
-module(receive_logs_topic). -compile([export_all]). -include_lib("amqp_client/include/amqp_client.hrl"). main(Argv) -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"topic_logs">>, type = <<"topic">>}), #‘queue.declare_ok‘{queue = Queue} = amqp_channel:call(Channel, #‘queue.declare‘{exclusive = true}), [amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"topic_logs">>, routing_key = list_to_binary(BindingKey), queue = Queue}) || BindingKey <- Argv], io:format(" [*] Waiting for logs. To exit press CTRL+C~n"), amqp_channel:subscribe(Channel, #‘basic.consume‘{queue = Queue, no_ack = true}, self()), receive #‘basic.consume_ok‘{} -> ok end, loop(Channel). loop(Channel) -> receive {#‘basic.deliver‘{routing_key = RoutingKey}, #amqp_msg{payload = Body}} -> io:format(" [x] ~p:~p~n", [RoutingKey, Body]), loop(Channel) end.
接收所有的日志
接收所有来自 facility是 "kern"的日志
接收“critical”日志:
创建多个bindings:
发送一个 “kern.critical”类型 routing key 的日志: