柯南君:看大数据时代下的IT架构(8)消息队列之RabbitMQ--案例(topic起航)

二、Topic(主题) (using the Java client)

上一篇文章中,我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发。
为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

三、Topic exchange(主题转换)

主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。

标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:

"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。

绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。

  • * 可以匹配一个标识符。
  • # 可以匹配0个或多个标识符。

下面一个简单的图片实例:

我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。

第一个标识符描述动物的速度,

第二个标识符描述动物的颜色,

第三个标识符描述动物的物种:<speed>.<color>.<species>。

我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。:

另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。 

注:主题类型的转发器非常强大,可以实现其他类型的转发器。

当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。 当绑定键中不包含任何#与*时,类似direct类型转发器

四、Putting it all together(全部代码)

我们要在我们的日志系统,用一个主题转换。我们先假设工作日志的路由键将两个词:<facility>.<severity>。

发送 EmitLogTopic.java:

[java] view plaincopyprint?

  1. public class EmitLogTopic {
  2. private static final String EXCHANGE_NAME = "topic_logs";
  3. public static void main(String[] argv)
  4. throws Exception {
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  10. String routingKey = getRouting(argv);
  11. String message = getMessage(argv);
  12. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
  13. System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘");
  14. connection.close();
  15. }
  16. //...
  17. }

接收端:ReceiveLogsTopic.java:

[java] view plaincopyprint?

  1. public class ReceiveLogsTopic {
  2. private static final String EXCHANGE_NAME = "topic_logs";
  3. public static void main(String[] argv)
  4. throws Exception {
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  10. String queueName = channel.queueDeclare().getQueue();
  11. if (argv.length < 1){
  12. System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
  13. System.exit(1);
  14. }
  15. for(String bindingKey : argv){
  16. channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
  17. }
  18. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  19. QueueingConsumer consumer = new QueueingConsumer(channel);
  20. channel.basicConsume(queueName, true, consumer);
  21. while (true) {
  22. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  23. String message = new String(delivery.getBody());
  24. String routingKey = delivery.getEnvelope().getRoutingKey();
  25. System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘");
  26. }
  27. }
  28. }

 

Run the following examples, including the classpath as in Tutorial 1 - on Windows, use %CP%.

To receive all the logs:

$ java -cp $CP ReceiveLogsTopic "#"

To receive all logs from the facility "kern":

$ java -cp $CP ReceiveLogsTopic "kern.*"

Or if you want to hear only about "critical" logs:

$ java -cp $CP ReceiveLogsTopic "*.critical"

You can create multiple bindings:

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

Have fun playing with these programs. Note that the code doesn‘t make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.

Some teasers:

  • Will "*" binding catch a message sent with an empty routing key?
  • Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
  • How different is "a.*.#" from "a.#"?

五、注释版的程序实例(全部代码)

  • import com.rabbitmq.client.Channel;
  • import com.rabbitmq.client.Connection;
  • import com.rabbitmq.client.ConnectionFactory;
  • public class EmitLogTopic {
  • private static final String EXCHANGE_NAME = "topic_logs";
  • public static void main(String[] argv) throws Exception {
  • ConnectionFactory factory = new ConnectionFactory();
  • factory.setHost("localhost");
  • Connection connection = factory.newConnection();
  • Channel channel = connection.createChannel();
  • channel.exchangeDeclare(EXCHANGE_NAME, "topic");//声明topic类型的Exchange
  • String routingKeyOne = "logs.error.one";// 定义一个路由名为“error”
  • for (int i = 0; i <= 1; i++) {
  • String messageOne = "this is one error logs:" + i;
  • channel.basicPublish(EXCHANGE_NAME, routingKeyOne, null, messageOne
  • .getBytes());
  • System.out.println(" [x] Sent ‘" + routingKeyOne + "‘:‘"
  • + messageOne + "‘");
  • }
  • System.out.println("################################");
  • String routingKeyTwo = "logs.error.two";
  • for (int i = 0; i <= 2; i++) {
  • String messageTwo = "this is two error logs:" + i;
  • channel.basicPublish(EXCHANGE_NAME, routingKeyTwo, null, messageTwo
  • .getBytes());
  • System.out.println(" [x] Sent ‘" + routingKeyTwo + "‘:‘"
  • + messageTwo + "‘");
  • }
  • System.out.println("################################");
  • String routingKeyThree = "logs.info.one";
  • for (int i = 0; i <= 3; i++) {
  • String messageThree = "this is one info logs:" + i;
  • channel.basicPublish(EXCHANGE_NAME, routingKeyThree, null,
  • messageThree.getBytes());
  • System.out.println(" [x] Sent ‘" + routingKeyThree + "‘:‘"
  • + messageThree + "‘");
  • }
  • channel.close();
  • connection.close();
  • }
  • }

运行结果可能如下:‘logs.error.one‘:‘this is one error logs:0‘

  • [x] Sent ‘logs.error.one‘:‘this is one error logs:1‘
  • ################################
  • [x] Sent ‘logs.error.two‘:‘this is two error logs:0‘
  • [x] Sent ‘logs.error.two‘:‘this is two error logs:1‘
  • [x] Sent ‘logs.error.two‘:‘this is two error logs:2‘
  • ################################
  • [x] Sent ‘logs.info.one‘:‘this is one info logs:0‘
  • [x] Sent ‘logs.info.one‘:‘this is one info logs:1‘
  • [x] Sent ‘logs.info.one‘:‘this is one info logs:2‘
  • [x] Sent ‘logs.info.one‘:‘this is one info logs:3‘
第一个C端的代码如下:
  • package com.abin.rabbitmq;  
     
    import com.rabbitmq.client.Channel;  
    import com.rabbitmq.client.Connection;  
    import com.rabbitmq.client.ConnectionFactory;  
    import com.rabbitmq.client.QueueingConsumer;  
     
    public class ReceiveLogsTopic {  
        private static final String EXCHANGE_NAME = "topic_logs";// 定义Exchange名称  
     
        public static void main(String[] argv) throws Exception {  
     
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            Connection connection = factory.newConnection();  
            Channel channel = connection.createChannel();  
     
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 声明topic类型的Exchange  
     
            String queueName = "queue_topic_logs1";// 定义队列名为“queue_topic_logs1”的Queue  
            channel.queueDeclare(queueName, false, false, false, null);  
    //      String routingKeyOne = "*.error.two";// "error"路由规则  
    //      channel.queueBind(queueName, EXCHANGE_NAME, routingKeyOne);// 把Queue、Exchange及路由绑定  
            String routingKeyTwo = "logs.*.one";//通配所有logs下第三词(最后一个)词为one的消息  
            channel.queueBind(queueName, EXCHANGE_NAME, routingKeyTwo);  
     
            System.out.println(" [*] Waiting for messages.");  
     
            QueueingConsumer consumer = new QueueingConsumer(channel);  
            channel.basicConsume(queueName, true, consumer);  
     
            while (true) {  
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
                String message = new String(delivery.getBody());  
                String routingKey = delivery.getEnvelope().getRoutingKey();  
     
                System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message  
                        + "‘");  
            }  
        }  
    }
  • 第二个C端的运行结果如下:
  • [*] Waiting for messages.  
    [x] Received ‘logs.error.one‘:‘this is one error logs:0‘  
    [x] Received ‘logs.error.one‘:‘this is one error logs:1‘  
    [x] Received ‘logs.error.two‘:‘this is two error logs:0‘  
    [x] Received ‘logs.error.two‘:‘this is two error logs:1‘  
    [x] Received ‘logs.error.two‘:‘this is two error logs:2‘  
    [x] Received ‘logs.info.one‘:‘this is one info logs:0‘  
    [x] Received ‘logs.info.one‘:‘this is one info logs:1‘  
    [x] Received ‘logs.info.one‘:‘this is one info logs:2‘  
    [x] Received ‘logs.info.one‘:‘this is one info logs:3‘

				
时间: 2024-08-04 23:29:12

柯南君:看大数据时代下的IT架构(8)消息队列之RabbitMQ--案例(topic起航)的相关文章

柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> <柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)> 二.Work Queues(using the Java Cl

柯南君:看大数据时代下的IT架构(6)消息队列之RabbitMQ--案例(Publish/Subscribe起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> <柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)> <柯南君:看大数据时代下的IT架构(5)消息队列之Rab

柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> 二.起航 本章节,柯南君将从几个层面,用官网例子讲解一下RabbitMQ的实操经典程序案例,让大家重新回到经典"Hello world!"(The simpl

柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控

柯南君上一章<看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍>中,粗略的讲了一下,目前消息队列的几种常见产品的优劣对比,接下来的几章节会分别详细阐述,本章介绍RabbitMQ,好吧,废话少说,正式开始: 一.安装 1.安装Erlang 1)系统编译环境(这里采用linux/unix 环境) ① 安装环境 虚拟机:VMware? Workstation 10.0.1 build Linux系统:CentOS6.5 rabbitMQ官网下载:http://www.rab

柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍

柯南君上一章<柯南君:看大数据时代下的IT架构(1)业界消息队列对比 >中,粗略的讲了一下,目前消息队列的几种常见产品的优劣对比,接下来的几章节会分别详细阐述,本章介绍RabbitMQ,好吧,废话少说,正式开始: 一.基础概念详细介绍 1.引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题. 消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问

柯南君:看大数据时代下的IT架构(1)业界消息队列对比

一.MQ(Message Queue) 即消息队列,一般用于应用系统解耦.消息异步分发,能够提高系统吞吐量.MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ.RabbitMQ.ActiveMQ.Kafka/Jafka.Kestrel.Beanstalkd.HornetQ.Apache Qpid.Sparrow.Starling.Amazon SQS.MSMQ等,甚至Redis也可以用来构造消息队列.至于如何取舍,取决于你的需求. 由于工作需要和兴趣爱好,曾经写过关于RabbitMQ的系列博

看大数据时代下的IT架构(1)业界消息队列对比

一.MQ(Message Queue) 即消息队列,一般用于应用系统解耦.消息异步分发,能够提高系统吞吐量.MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ.RabbitMQ.ActiveMQ.Kafka/Jafka.Kestrel.Beanstalkd.HornetQ.Apache Qpid.Sparrow.Starling.Amazon SQS.MSMQ等,甚至Redis也可以用来构造消息队列.至于如何取舍,取决于你的需求. 由于工作需要和兴趣爱好,曾经写过关于RabbitMQ的系列博

看大数据时代下的IT架构(1)图片服务器之演进史

        柯南君的公司最近产品即将上线,由于产品业务对图片的需求与日俱增,花样百出,与此同时,在大数据时代,大流量的冲击下,对图片服务器的压力可想而知,那么今天,柯南君结合互联网的相关热文,加上自己的一点实践经验,与君探讨,与君共勉! 一.图片服务器的重要性 当前,不管哪一家网站(包括 电商行业.O2O行业.互联网行业等),不管哪一种渠道 (包括 web端,APP端甚至一些SNS应用),在大数据时代下,在内容为王的前提下,对图片的需求量越来越大,柯南君的公司是一家O2O公司,也不例外,图片

柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)

二.Remote procedure call (RPC)(using the Java client) 三.Client interface(客户端接口) 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果.代码如下: fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print "f

柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)

二.Routing(路由) (using the Java client) 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 三.Bindings(绑定) 在前面的学习中已经创建了绑定(bindings),代码如下: channel