RabbitMQ之Topics(多规则路由)

Exchange中基于direct类型无法基于多种规则进行路由。

例如分析syslog日志,不仅需要基于severity(info/warning/critical/error)进行路由,还需要基于auth、cron或者kernal模式进行路由。

Topic exchange可以满足这种需求。

Topic exchange

基于topic类型交换器的routing key不是唯一的,而是一系列词,基于点区分。

例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"

binding key也是类型方式。

*表示只匹配一个关键字

#可以匹配0或者多个关键字

Q1队列匹配对所有orange的关注

Q2队列可以监听rabbits,以及所有lazy开头的

quick.orange.fox只会进入Q1,lazy.brown.fox只会进入Q2

lazy.pink.rabbit尽管和两个topic都匹配上,但是只会进入队列以此

quick.brown.fox不会和任何队列进行绑定,因此会被丢弃

如果发送orange或者quick.orange.male.rabbit不会和任何队列进行绑定,也都会被丢弃。

Topic exchange功能很强大,类似其他exchange。当一个队列设置binding key为#,则它可以接收所有消息,不管routing key如何设置,类似fanout exchange。

当不设置*和#的话,topic exchange就类似direct exchange。

例子

发送消息

#!/usr/bin/env python
#-*- coding:utf8 -*-
import sys
import pika
import logging

logging.basicConfig(format=‘%(levelname)s:$(message)s‘,level=logging.CRITICAL)

def emit_log():

    pika.connection.Parameters.DEFAULT_HOST = ‘localhost‘
    pika.connection.Parameters.DEFAULT_PORT = 5672
    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = ‘/‘
    pika.connection.Parameters.DEFAULT_USERNAME = ‘guosong‘
    pika.connection.Parameters.DEFAULT_PASSWORD = ‘guosong‘

    para = pika.connection.Parameters()

    connection = pika.BlockingConnection(para)

    channel = connection.channel()
    #声明一个topic_logs交换器,类型为topic
    channel.exchange_declare(exchange=‘topic_logs‘,type=‘topic‘)

    #指定路由key
    routing_key = sys.argv[1] if len(sys.argv) >1 else ‘anonymous.info‘

    message = ‘ ‘.join(sys.argv[2:]) or "info:Hello World!"                                         

    #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失
    #对于日志类消息,如果没有消费者监听的话,这些消息就会忽略
    channel.basic_publish(exchange=‘topic_logs‘,routing_key=routing_key,body=message)

    #%r也是string类型
    print "[x] Sent %r" % (message,)

    connection.close()

if __name__ == ‘__main__‘:
    emit_log()

接收消息

def callback(ch, method, properties, body):
    print " [x] %r " % (body,)

def receive_logs():

    pika.connection.Parameters.DEFAULT_HOST = ‘localhost‘
    pika.connection.Parameters.DEFAULT_PORT = 5672
    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = ‘/‘
    pika.connection.Parameters.DEFAULT_USERNAME = ‘guosong‘
    pika.connection.Parameters.DEFAULT_PASSWORD = ‘guosong‘

    para = pika.connection.Parameters()

    connection = pika.BlockingConnection(para)

    channel = connection.channel()

    #声明一个topic_logs交换器,类型为topic
    channel.exchange_declare(exchange=‘topic_logs‘,type=‘topic‘)

    #声明一个随机队列,设置exclusive=True,在该consumer退出的时候,对应的队列被删除
    result = channel.queue_declare(exclusive=True)
    #获取随机队列的名称
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]

    if not binding_keys:
        print >> sys.stderr, "Usage: %s [binding_key]" %  sys.argv[0]
        sys.exit(1)

    for binding_key in binding_keys:
         #绑定交换器和队列
        channel.queue_bind(exchange=‘topic_logs‘,queue=queue_name,routing_key=binding_key)
    print ‘[*] Wating for logs.To exit press CTRL+C‘

    #开始消费消息
    channel.basic_consume(callback,queue=queue_name,no_ack=True)

    channel.start_consuming()

接收所有消息

python receive_logs_topic.py "#"

接收kern开头,包含一个关键字

python receive_logs_topic.py "kern.*"

多个绑定

python receive_logs_topic.py "kern.*" "*.critical"
[email protected]:~/code/rabbitmq/ch5$ python emit_log.py "kern.critical" "A critical kernel error"[x] Sent ‘A critical kernel error‘

[email protected]:~/code/rabbitmq/ch5$ ./receive_logs.py "kern.*"
[*] Wating for logs.To exit press CTRL+C
 [x] ‘A critical kernel error‘

参考链接

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

RabbitMQ之Topics(多规则路由)

时间: 2024-09-27 05:49:50

RabbitMQ之Topics(多规则路由)的相关文章

thinkphp 规则路由

规则路由是一种比较容易理解的路由定义方式,采用ThinkPHP设计的规则表达式来定义. 规则表达式 规则表达式通常包含静态地址和动态地址,或者两种地址的结合,例如下面都属于有效的规则表达式: 'my' => 'Member/myinfo', // 静态地址路由 'blog/:id' => 'Blog/read', // 静态地址和动态地址结合 'new/:year/:month/:day'=>'News/read', // 静态地址和动态地址结合 ':user/:blog_id' =&g

【译】RabbitMQ:Topics

在前面的教程中,我们对日志系统进行了功能强化.我们使用direct类型的交换器并且为之提供了可以选择接收日志的能力,替换了只能傻乎乎的广播消息的fanout类型的交换器.尽管使用direct类型的交换器强化了系统,但是它依然有一些限制,不能基于条件的进行路由. 在日志系统中,我们或许希望不仅能根据严重等级,还能基于日志的发送源来订阅日志日志.你可能已经从Unix的syslog工具中知道了这个概念,该工具路由日志的时候既基于严重等级(info/warn/crit...)又基于设备(auth/cro

RabbitMQ erlang "topics"

原文链接:http://www.rabbitmq.com/tutorials/tutorial-five-python.html 在前面的例子中我们改进了我们的日志系统.使用 fanout 类型的exchage 只能广播消息.我们使用 direct 来代替,获得了选择性接收消息的可能. 虽然使用direct类型的exchange改善了我们的系统,但它仍然有缺陷,它不能基于多种条件 进行routing. 在我们的日志系统中,我们可能想要订阅日志不仅基于严重性程度,而且基于发布日志的源码.你可能知道

rabbitMQ学习笔记(五) 消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉. 这时候就可以对消息进行过滤了. 在消费者端设置好需要接收的消息类型. 如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型. channel.basicPublish( exchangeName , messageType , null , msg.getBytes()); 示例

Zuul路由转发规则

定制的路由规则的主要功能: 1.路由表中包含源路径,微服务名称,目标路径 2.Endpoint粒度配置支持 3.路由支持1对1精确路由 4.源路径可以前缀/**格式来模糊路由 5.目标路径可以使用前缀/**格式来装配目标路径 6.保留默认动态路由规则:服务名称/** --> 是否截去前缀 --> 目标路径 7.保留默认动态路由规则是否支持截去前缀的配置参数stripPrefix特性 8.路由规则可以在不重启服务动态更新,这个功能通过外化配置来支持 9.匹配股则采取谁先匹配路由谁,也就是说在路由

thinkphp学习笔记10—看不懂的路由规则

路由这部分貌似在实际工作中没有怎么设计过,只是在用默认的设置,在手册里面看到部分,艰涩难懂. 1.路由定义 要使用路由功能需要支持PATH_INFO,PATH_INFO是什么呢?手册中提到“要使用路由功能,前提是你的URL支持PATH_INFO(或者兼容URL模式也可以,采用普通URL模式的情况下不支持路由功能),” , url支持path_info,不是apache要支持path_info么,度娘讲的还算清楚一点,见下文: pathinfo(PHP 4 >= 4.0.3, PHP 5)path

Go的http包中默认路由匹配规则

# 一.执行流程 首先我们构建一个简单http server: ```go package main import ( "log" "net/http" ) func main() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("hello world")) }) log.Fatal(http.Liste

【转】RabbitMQ基础——和——持久化机制

这里原来有一句话,触犯啦天条,被阉割!!!! 首先不去讨论我的日志组件怎么样.因为有些日志需要走网络,有的又不需要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,我们只谈消息RabbitMQ. 那么什么是RabbitMQ,它是用来解决什么问题的,性能如何,又怎么用?我会在下面一一阐述,如有错误,不到之处,还望大家不吝赐教. RabbitMQ简介 必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(

.Net使用RabbitMQ详解

RabbitMQ简介 必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(因此也是继承了这些优点). 百度百科对RabbitMQ阐述也非常明确,建议去看下,还有amqp协议. RabbitMQ官网:http://www.rabbitmq.com/ 如果你要下载安装,那么必须先把Erlang语言装上. RabbitMQ的.net客户端,可以在nuget中输入rabbitmq轻松获得. RabbitMQ与其他消