rabbitMQ第三篇:采用不同的交换机规则

在上一篇我们都是采用发送信息到队列然后队列把信息在发送到消费者,其实实际情况并非如此,rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中,交换机是如何做处理他接收到的信息,并怎么样发送到特定的队列,那么这一篇主要是讲解交换机的规则。

一:发布/订阅

在上一篇说到的队列都指定了名称,但是现在我们不需要这么做,我们需要所有的日志信息,而不只是其中的一个。如果要做这样的队列,我们需要2件事,一个就是获取一个新的空的队列,这样我就需要创建一个随机名称的队列,最好让服务器帮我们做出选择,第一个就是我们断开用户的队列,应该自动进行删除。ok下面是一副工作图。

信息发送端代码

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分发,所有的消费者得到同样的队列信息
        //分发信息
        for (int i=0;i<5;i++){
            String message="Hello World"+i;
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("EmitLog Sent ‘" + message + "‘");
        }
        channel.close();
        connection.close();
    }

消费者代码

public class ReceiveLogs1 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //产生一个随机的队列名称
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定

        System.out.println("ReceiveLogs1 Waiting for messages");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogs1 Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);//队列会自动删除
    }
}

上面就完成了一个发布/订阅模式的消息队列 看看结果

二:Routing

上面我用采用了广播的模式进行消息的发送,现在我们采用路由的方式对不同的消息进行过滤

发送端代码

public class RoutingSendDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");//注意是direct
        //发送信息
        for (String routingKey:routingKeys){
            String message = "RoutingSendDirect Send the message level:" + routingKey;
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("RoutingSendDirect Send"+routingKey +"‘:‘" + message);
        }
        channel.close();
        connection.close();
    }
}
ReceiveLogsDirect1 消费者代码
public class ReceiveLogsDirect1 {
    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"info" ,"warning"};

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //获取匿名队列名称
        String queueName=channel.queueDeclare().getQueue();

        //根据路由关键字进行绑定
        for (String routingKey:routingKeys){
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
                    " queue:"+queueName+", BindRoutingKey:" + routingKey);
        }
        System.out.println("ReceiveLogsDirect1  Waiting for messages");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsDirect1 Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
ReceiveLogsDirect2消费者代码
public class ReceiveLogsDirect2 {
    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"error"};

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //获取匿名队列名称
        String queueName = channel.queueDeclare().getQueue();
        //根据路由关键字进行多重绑定
        for (String severity : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
            System.out.println("ReceiveLogsDirect2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
        }
        System.out.println("ReceiveLogsDirect2 Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsDirect2 Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

上面代码可以看出这里是通过路由来找个这个对列的。我们看下结果

三:Topics

这种应该属于模糊匹配

* :可以替代一个词

#:可以替代0或者更多的词

现在我们继续看看代码来理解

发送端

public class TopicSend {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try{
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("localhost");
            connection=factory.newConnection();
            channel=connection.createChannel();

            //声明一个匹配模式的交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            //待发送的消息
            String[] routingKeys=new String[]{
                    "quick.orange.rabbit",
                    "lazy.orange.elephant",
                    "quick.orange.fox",
                    "lazy.brown.fox",
                    "quick.brown.fox",
                    "quick.orange.male.rabbit",
                    "lazy.orange.male.rabbit"
            };
            //发送消息
            for(String severity :routingKeys){
                String message = "From "+severity+" routingKey‘ s message!";
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println("TopicSend Sent ‘" + severity + "‘:‘" + message + "‘");
            }
        }catch (Exception e){
            e.printStackTrace();
            if (connection!=null){
                channel.close();
                connection.close();
            }
        }finally {
            if (connection!=null){
                channel.close();
                connection.close();
            }
        }
    }
}

消费者1:

public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个匹配模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        //路由关键字
        String[] routingKeys = new String[]{"*.orange.*"};
        //绑定路由
        for (String routingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
        }
        System.out.println("ReceiveLogsTopic1 Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsTopic1 Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

消费者2:

ublic class ReceiveLogsTopic2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//      声明一个匹配模式的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        // 路由关键字
        String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//      绑定路由关键字
        for (String bindingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
        }

        System.out.println("ReceiveLogsTopic2 Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException  {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsTopic2 Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

运行后结果

时间: 2024-10-06 09:35:39

rabbitMQ第三篇:采用不同的交换机规则的相关文章

RabbitMQ(三) ——发布订阅

RabbitMQ(三) --发布订阅 (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理.该模式也符合上一节工作队列中的ack.预取等规则. 发布订阅模式如下图所示: 二.交换机(exchange) 生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系.在没有定义交换机的时候,RabbitM

LoadRunner用户行为模拟器 《第三篇》

用户行为模拟器简称VU,VU通过运行VU脚本模拟了用户对软件的操作行为.VU是基于网络协议的.很明显,被测服务器是通过各种各样的网络协议与客户端打交道的.VU要"骗过"被测服务器,当然就要遵守这些协议,按规矩.按步骤来执行动作,否则就会吃"闭门羹". 基于网络协议的脚本的一个好处是,我们可以使用相对少的硬件资源,来生成大量的虚拟用户负载.相比之下,WinRunner和QTP脚本时基于界面事件的,它在一台主机上同时只能运行一个虚拟用户的脚本,因为一个虚拟用户会占用整个

第三篇:用SOUI能做什么?

SOUI-DEMO界面预览 在回答SOUI能做什么之前,先看看SVN中demo工程的界面截图: 使用SOUI实现上面的界面主要的工作全在配置几个XML文件,基本不需要写C++代码.(如何配置XML布局将在后续文章中讲解) 从零开始生成一个使用SOUI的应用程序 以SOUI的demo为例,我们看在SOUI中如何一步一步实现一个应用程序. 首先使用Win32应用程序向导生成一个空项目. 新建一个如demo.cpp文件,定义一个_tWinMain函数. int WINAPI _tWinMain(HIN

SQL Server索引的维护 - 索引碎片、填充因子 &lt;第三篇&gt;

实际上,索引的维护主要包括以下两个方面: 页拆分 碎片 这两个问题都和页密度有关,虽然两者的表现形式在本质上有所区别,但是故障排除工具是一样的,因为处理是相同的. 对于非常小的表(比64KB小得多),一个区中的页面可能属于多余一个的索引或表---这被称为混合区.如果数据库中有太多的小表,混合区帮助SQL Server节约磁盘空间. 随着表(或索引)增长并且请求超过8个页面,SQL Server创建专用于该表(或索引)的区并且从该区中分配页面.这样一个区被称为统一区,它可以为多达8个相同表或索引的

第三篇——第二部分——第二文 计划搭建SQL Server镜像

原文:第三篇--第二部分--第二文 计划搭建SQL Server镜像 本文紧跟上一章:SQL Server镜像简介 本文出处:http://blog.csdn.net/dba_huangzj/article/details/27203053 俗话说:工欲善其事必先利其器.计划好如何部署和使用镜像,可以减少很多不必要的风险.本文将按照三步骤的形式展示,但是要注意这不是唯一的标准,具体情况具体分析. 第一步:了解环境 在搭建SQL Server镜像时,必须先了解你所要部署的环境,才能决定镜像的配置项

C语言中容易被忽略的细节(第三篇)

前言:本文的目的是记录C语言中那些容易被忽略的细节.我打算每天抽出一点时间看书整理,坚持下去,今天是第一篇,也许下个月的今天是第二篇,明年的今天又是第几篇呢?--我坚信,好记性不如烂笔头.第三篇了,fight~... 第一篇链接:C语言中容易被忽略的细节(第一篇) 第二篇链接:C语言中容易被忽略的细节(第二篇) 1.__attribute__((noreturn)) __attribute__可设置函数属性.变量属性和类型属性.__attribute__((noreturn))设置了函数属性,n

深入理解javascript函数系列第三篇

前面的话 函数是javascript中特殊的对象,可以拥有属性和方法,就像普通的对象拥有属性和方法一样.甚至可以用Function()构造函数来创建新的函数对象.本文是深入理解javascript函数系列第三篇--属性和方法 属性 [length属性] 函数系列第二篇中介绍过,arguments对象的length属性表示实参个数,而函数的length属性则表示形参个数 function add(x,y){ console.log(arguments.length)//3 console.log(

并行计算复习————第三篇 并行计算理论基础:并行数值算法

第三篇 并行计算理论基础:并行数值算法 注:此篇较水,=.= Ch9 稠密矩阵运算 9.1 矩阵的划分 矩阵的划分一般分为带状划分和棋盘划分,在此基础上又有循环划分的变体: 带状划分:把矩阵的若干行或若干列连续地划分给一个处理器 循环带状划分:把矩阵的若干行或若干列间断且等间隔地划分给一个处理器 棋盘划分:把方阵连续地划分成若干子方阵,每个处理器指派一个子方阵 循环棋盘划分:把方阵间断且等间隔地划分成若干子方阵,每个处理器指派一个子方阵 一般情况下,棋盘划分的划分方法能够开发出更高并行度的算法

javascript面向对象系列第三篇——实现继承的3种形式

前面的话 学习如何创建对象是理解面向对象编程的第一步,第二步是理解继承.开宗明义,继承是指在原有对象的基础上,略作修改,得到一个新的对象.javascript主要包括类式继承.原型继承和拷贝继承这三种继承方式.本文是javascript面向对象系列第三篇——实现继承的3种形式 类式继承 大多数面向对象的编程语言都支持类和类继承的特性,而JS却不支持这些特性,只能通过其他方法定义并关联多个相似的对象,如new和instanceof.不过在后来的ES6中新增了一些元素,比如class关键字,但这并不