rabbitmq+java入门(三)exchange的使用

参考:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

先决条件

本教程假定RabbitMQ 在标准端口(5672)上的本地主机上安装并运行。如果您使用不同的主机,端口或证书,则连接设置需要进行调整。

在之前的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只被传递给一个消费者。在这一部分,我们将做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。

为了说明这种模式,我们将建立一个简单的日志系统。它将包含两个程序 - 第一个将发射日志消息,第二个将接收并打印它们。

在我们的日志系统中,每个接收程序的运行副本都会收到消息。这样我们就可以运行一个接收器并将日志指向硬盘; 同时我们将能够运行另一个接收器并在屏幕上查看日志。

基本上,发布的日志消息将被广播给所有的接收者。

exchanges

在本教程的上一个部分中,我们发送消息并从队列中接收消息。现在是时候在rabbit中引入完整的消息传递模型了。

让我们快速回顾一下前面教程中的内容:

  • 一个生产者是发送消息的用户的应用程序。
  • 一个队列是存储消息的缓冲器。
  • 一个消费者是接收消息的用户的应用程序。

RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道邮件是否会被传送到任何队列中。

相反,生产者只能发送消息给交易所(exchange)。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它推动他们排队。exchange必须知道如何处理收到的消息。是否应该附加到特定队列?它应该附加到许多队列中吗?或者它应该被丢弃吗。这些规则都由交换类型定义 。

有几种可用的交换类型:direct, topic, headers 和 fanout。我们将关注最后一个fanout 。让我们创建一个这种类型的exchage,命名为logs:

channel.exchangeDeclare(“logs”,“fanout”);

fanout非常简单。词如其名,它只是将收到的所有消息广播到它所知道的所有队列中。这也正是我们的日志系统所需要的。

绑定

exchange和队列之间的关系称为绑定。代码如下:

channel.queueBind(queueName,“logs”,“”);

列出绑定

您可以用以下的命令列出所有的绑定:

rabbitmqctl list_bindings

把以上这些放在一起

发出日志消息的生产者程序与之前的教程没有多大区别。最重要的变化是我们现在想发布消息到我们的自己定义的名称为logs的exchange而不是一个没有名称的exchange。发送时我们需要提供一个routingKey,但是对于fanout交换而言这个值被忽略了。以下是EmitLog.java程序的代码 :

package rmq.publishSubscribe;

/**
 * Created by zuzhaoyue on 18/5/16.
 */
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //exchange的类型包括:direct, topic, headers and fanout,我们本例子主要关注的是fanout
        //fanout类型是指向所有的队列发送消息
        //以下是创建一个fanout类型的exchange,取名logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String message = getMessage(argv);

        //1.在上个"hello world"例子中,我们用的是channel.basicPublish("", "hello", null, message.getBytes());
        //这里用了默认的exchanges,一个空字符串 "",在basicPublish这个方法中,第一个参数即是exchange的名称
        //2.准备向我们命名的exchange发送消息啦
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent ‘" + message + "‘");

        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "info: Hello World!";
        return joinStrings(strings, " ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}
 

如你所见,建立连接后我们声明了exchange。这一步是必要的,因为不可以发布到不存在的exchange。

如果没有队列绑定到交换机上,这些消息将会丢失,但这对当前的例子来说没问题; 如果没有消费者正在收听,我们可以放心地丢弃消息。

以下是接收方的代码:

//package rmq.workqueues;

/**
 * Created by zuzhaoyue on 18/5/16.
 */
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //上个例子中我们是向一个队列中发送消息,接收方也是从一个队列中获取,那种情况下给队列命名是很重要的,因为你需要生产者和消费者共享这个队列
        //但是这个例子里,则不需要给队列命名,首先看下需求:即时读取日志,可以看出日志系统需要的是即时性,那些旧的日志我们不需要看,所以我们必须满足以下两点
        //1.每次连接rmq时我们都需要一个新的空的队列,这个可以用随机给队列命名并创建来实现,或者更棒的方式是,让rmq服务器自己随机选择一个名字给我们
        //2.当我们关闭与rmq的连接时,这个队列得自动删除
        //当然,这个已经有封装好的方法了哈哈:channel.queueDeclare().getQueue()方法,可以创建一个暂时的,独立的,可自动删除并随机命名的队列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("队列名称:" + queueName);
        //现在我们已经有了一个exchange,下一步就是让exchange向队列发送消息,exchange与队列之间的关系也叫做binding(绑定)
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

为了演示不同队列都接收到了消息,我们把队列的名称打印出来,并且一个显示在屏幕上,一个重定向到文件中:

第一个消费者启动:启动ReceiveLog.java的main()方法

第二个消费者启动(重定向到文件/data/rmqlogs.log中):

javac -cp /data/amqp-client-4.2.0.jar ReceiveLogs.java
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogs>/data/rmqlogs.log

注意:

1.用javac和java命令启动时需要将package那行代码注释掉,不然会报找不到或无法加载主类的错误。

2.两个jar包必须加上,不然会报找不到jar的异常

启动完成后,我们启动第一个EmotLog.java的main()方法,可以观察到屏幕上打印内容:

与此同时,tail -f /data/rmqlogs.log显示如下:

可以发现两个队列名称不同,但接收到了相同的消息,调试成功。

原文地址:https://www.cnblogs.com/zuxiaoyuan/p/9047164.html

时间: 2024-08-30 15:26:34

rabbitmq+java入门(三)exchange的使用的相关文章

Java入门(三)——集合概讲

集合(或者叫容器)是Java的核心知识点,它有着很深的深度.我们这里不会设计多深,仅仅作为了解入门,深入了解请移步各种集合源码文章.好的,下面正是开始介绍... Java集合为何而生 我们知道,Java是一门面向对象编程语言,这也就意味着程序中存在着大量的对象.这个时候问题就来了,我们如何很好的存放和操作对象呢?如果你能明白这个问题,那么你就知道了"集合为何而生"这个问题的答案. 总结一句: Java给我们提供了工具(集合)方便我们去存放和操作多个Java对象 Java集合入门学习 J

Java入门——面向对象基础(三)

Java入门——抽象类与接口的应用 本Blog内容 为抽象类与接口实例化 抽象类的实际应用——模板设计 接口的实际应用——制定标准 设计模式——工厂设计 设计模式——代理设计 为抽象类与接口实例化 利用多态性 1 package Sep16; 2 3 public class AbstractCaseDemo1 { 4 5 /** 6 * @param args 7 */ 8 public static void main(String[] args) { 9 // TODO Auto-gene

Flex入门(三)——微架构之Cairngorm

大家都知道我们在开发后台的时候,都会使用MVC,三层等分层架构,使后台代码达到职责更为分明单一,高内聚低耦合,例如,Dao层只是进行和数据库打交道,负责处理数据:Service(B层)只是进行逻辑判断处理,而Action则进行后台和前台页面的交互等.从而使程序更加容易管理,更加灵活,更加容易扩展,更加容易维护.也就是大家比较熟悉的Struts(SpringMVC)+Spring+Hibernate(Mybatis)等. 而作为前台Flex处理,也提供了类似的处理功能,想要达到的效果,也是代码分层

Java入门学习知识点汇总--第一部分

Java入门重要知识点在这里总结一下,以方便日后复习,这部分内容主要有:变量和常量,常用的运算符,流程控制语句,数组,方法这些内容 一.变量和常量 1.Java关键字 先贴张图: 所有关键字区分大小写. 2.标识符 标识符就是用于给 Java 程序中变量.类.方法等命名的符号. 注意几条规则: 标识符可以由字母.数字.下划线(_).美元符($)组成,但不能包含 @.%.空格等其它特殊字符,不能以数字开头. 标识符不能是 Java 关键字和保留字,但可以包含关键字和保留字. 标识符是严格区分大小写

Java入门(二)——果然断更的都是要受惩罚的。。。

断更了一个多月,阅读量立马从100+跌落至10-,虽说不是很看重这个,毕竟只是当这个是自己的学习笔记,但有人看,有人评论,有人认同和批评的感觉还是很巴适的,尤其以前有过却又被剥夺的,惨兮兮的. 好好写吧. 现在开展的“业务”,一个是PHP,一个是Android开发. 前者偏向于三个方向,总结之前的(看书,敲代码实现),电商网站的开发,一些技能的实现: 后者起步阶段,一边Java学习,一边动手做东西出来,争取一周内有个交代吧先. 今天还比较坑一点,把昨天弄的卸载了,AS中的虚拟界面用不了,用哥们儿

Java入门——深入理解Java三大特性

Java入门——深入理解Java三大特性 本Blog内容: 封装 继承 多态 封装 封装把一个对象的属性私有化,同时提供一些可以被外界访问的属性的方法(getter,setter),如果不想被外界方法,我们大可不必提供方法给外界访问. 封装确实可以使我们容易地修改类的内部实现,而无需修改使用了该类的客户代码. 可以对成员变量进行更精确的控制.(在setter方法中进行实际意义的校验) 总结:控制属性访问权限,不是你想改就能改.容易修改属性类型.精确控制属性的取值范围. 继承 继承是使用已存在的类

Java 入门基础——面向对象的特征

计算机软件系统是现实生活中的业务在计算机中的映射,而现实生活中的业务其实就是一个个对象协作的过程.面向对象编程就是按现实业务一样的方式将程序代码按一个个对象进行组织和编写,让计算机系统能够识别和理解用对象方式组织和编写的程序代码,这样就可以把现实生活中的业务对象映射到计算机系统中. 面向对象的编程语言有封装.继承 .抽象.多态等4个主要的特征. 封装: 封装是保证软件部件具有优良的模块性的基础,封装的目标就是要实现软件部件的"高内聚.低耦合",防止程序相互依赖性而带来的变动影响.在面向

Zookeeper Api(java)入门与应用(转)

如何使用 Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化.通过监控这些数据状态的变化,从而可以达到基于数据的集群管理,后面将会详细介绍 Zookeeper 能够解决的一些典型问题,这里先介绍一下,Zookeeper 的操作接口和简单使用示例. 常用接口列表 客户端要连接 Zookeeper 服务器

第1章Java入门体验

第1章Java入门体验 1.java简介和平台应用 Java是sun公司开发出来,现在属于ORACLE公司java分为几个部分:首先是最基础的Java SE部分,这部分是Java的基础知识,主要包括:变量.语法.面向对象,API,JVM等等再Java SE基础之上分为两个部分.一个是开发企业级的服务,一个是嵌入式的开发Java EE是企业级的开发,主要包括:JSP.EJB.服务等等.Java ME是嵌入式的开发,主要有移动设备,游戏,通信等初学的基本路线都是从Java SE出发,先学基础,再学方