RabbitMQ入门_02_HelloWorld

A. AMQP基础

RabbitMQ 并不是基于 Java 开发人员熟悉的 JMS 规范设计开发的,而是基于一个比 JMS 更新更合理的 AMQP (Advanced Message Queuing Protocol) 协议。所以,在开始 RabbitMQ 之旅前,需要先对 AMQP 有一定的了解。毕竟,RabbitMQ 就是 AMQP 协议的第一个开源实现。

a. AMQP messaging 中的基本概念

以下内容来自 http://www.cnblogs.com/frankyou/p/5283539.html

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的TCP连接。断开连接的操作只会在 client 端进行,broker不会断开连接,除非出现网络故障或 broker 服务出现问题
  • Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走。一个 message 可以被同时拷贝到多个 queue 中
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

简而言之,Publisher 发送的消息通过 Connection 中的 Channel 到达 Broker 的某个 Virtual Host,消息经过指定的 Exchange,根据 Binding 提供的分发依据,分发到 0~n 个 Queue 中;Queue 中的消息等待 Consumer 消费。

b. AMQP 模型解释

官网有一篇更详尽的模型解释:https://www.rabbitmq.com/tutorials/amqp-concepts.html

这篇文章一看就知道是好东西,可惜读起来太累了,以后再来回顾。

c. AMQP 规范

官网有 AMQP 规范的快速参考:https://www.rabbitmq.com/amqp-0-9-1-quickref.html

AMQP 规范全部内容可在 AMQP 官网查阅:http://www.amqp.org/

这些内容短期不会接触,留个脚印以后有机会再研究。

B. 你好,世界

参考资料:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

示例代码:https://github.com/gordonklg/study,rabbitmq module

gordon.study.rabbitmq.helloworld.Sender.java

  1. import java.util.concurrent.TimeUnit;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. publicclassSender{
  6. privatestaticfinalString QUEUE_NAME ="hello";
  7. publicstaticvoid main(String[] argv)throwsException{
  8. ConnectionFactory factory =newConnectionFactory();
  9. factory.setHost("localhost");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. for(int i =0; i <10;){
  14. String message ="NO. "+++i;
  15. TimeUnit.MILLISECONDS.sleep(1000);
  16. channel.basicPublish("", QUEUE_NAME,null, message.getBytes("UTF-8"));
  17. System.out.printf("(%1$s)[===>%2$s ] %3$s\n","Sender",":"+ QUEUE_NAME, message);
  18. }
  19. channel.close();
  20. connection.close();
  21. }
  22. }

代码第12-14行通过 ConnectionFactory 创建了一个 Connection。Broker Host 地址为 localhost,端口是默认的 5672,用户密码是默认的 guest,VHOST 是默认的 "/"。(实际上默认的 Host 就是 localhost,第13行可以省略)

第15行在 Connection 中创建了一个虚拟的 Channel。

第17行申明了一个名字为 hello 的 Queue。不同于 JMS,AMQP 协议允许通过编程方式创建绝大多数的模型,例如 Exchange、Queue 等等。之所以是申明,是因为该方法会先判断当前 VHOST 中是否已存在一个同名的 Queue,如果不存在,则创建 Queue;如果存在,可能会直接使用已存在的 Queue,也可能返回异常(例如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)。

queueDeclare 方法中三个 boolean 型参数指定了 Queue 的属性:

  • durable:队列本身是否需要持久化。如果为 false,则 RabbitMQ 重启后该 Queue 消失。
  • exclusive:排他性队列确保该队列只对申明它的连接可见,注意是连接而不是 Channel。当相应连接关闭时,该队列自动删除。
  • autoDelete:设置为 true 的队列会在所有 Consumer 都断开连接时自动删除(不管是否是 durable)。队列被第一个 Consumer 连接前不会被删除。

第22行发布一个消息。basicPublish 方法第一个参数指定发布该消息所使用的 Exchange 名称,空字符串为系统预先定义的默认 Exchange,类型为 direct。第二个参数指定路由键,对于 direct 类型的 Exchange,会将该消息发送到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中。

gordon.study.rabbitmq.helloworld.Receiver.java

  1. publicclassReceiver{
  2. privatefinalstaticString QUEUE_NAME ="hello";
  3. publicstaticvoid main(String[] argv)throwsException{
  4. ConnectionFactory factory =newConnectionFactory();
  5. factory.setHost("localhost");
  6. Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel();
  8. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  9. Consumer consumer =newDefaultConsumer(channel){
  10. @Override
  11. publicvoid handleDelivery(String consumerTag,Envelope envelope, AMQP.BasicProperties properties,byte[] body)
  12. throwsIOException{
  13. String message =newString(body,"UTF-8");
  14. System.out.printf(" [ %2$s<===](%1$s) %3$s\n","Receiver", QUEUE_NAME, message);
  15. try{
  16. TimeUnit.MILLISECONDS.sleep(500);
  17. }catch(InterruptedException e){
  18. }
  19. }
  20. };
  21. channel.basicConsume(QUEUE_NAME,true, consumer);
  22. }
  23. }

代码从第13行开始定义一个 Consumer,通过 handleDelivery 回调接口,我们可以处理从队列中获取的消息。

第25行启动一个 Consumer,basicConsume 方法第一个参数指定 Consumer 消费的队列,即 hello 队列;第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认;第三个参数指定 Consumer

现在,可以花式运行两个类,看看发生了什么了。

时间: 2024-10-10 08:10:38

RabbitMQ入门_02_HelloWorld的相关文章

2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置 公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务.那么到后期的话,登录服务器之后,全是 一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端 该案例的接收端,源自网上的代码片段

RabbitMQ入门与使用篇

介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官方 概念: Brocker:消息队列服务器实体. Exchange:消息交换机,指定消息按什么规则,路由到哪个队列. Queue:消息队列,每个消息都会被投入到一个或者多个队列里. Binding:绑定,它的作用是把exchange和queue按

RabbitMQ入门(二) —— direct交换器

在RabbitMQ入门(一)里我们讲到exchange有三种最主要的类型:direct.fanout和topic. 这里我们先来看看最简单的direct交换器的使用. 下面是测试代码: package com.jaeger.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.junit.Test; import com.rabbitmq.clie

RabbitMQ入门:主题路由器(Topic Exchange)

上一篇博文中,我们使用direct exchange 代替了fanout exchange,这次我们来看下topic exchange. 一.Topic Exchange介绍 topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准. direct路由器类似于sql语句中的精确查询:topic 路由器有点类似于sql语句中的模糊查询. 还记得吗?我

RabbitMQ入门教程(十一):消息属性Properties

原文:RabbitMQ入门教程(十一):消息属性Properties 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78698364 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 -

RabbitMQ入门教程(四):工作队列(Work Queues)

原文:RabbitMQ入门教程(四):工作队列(Work Queues) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78596426 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 工作队列 使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们

RabbitMQ入门教程(六):路由选择Routing

原文:RabbitMQ入门教程(六):路由选择Routing 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78629168 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示使用直连接类型,将多个路由键绑定到同一个队列上.也可以将同一个键绑定到多个队列上(多重绑定multiple bind

RabbitMQ入门教程(九):首部交换机Headers

原文:RabbitMQ入门教程(九):首部交换机Headers 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78638988 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 首部交换机和扇形交换机都不需要路由键routingKey,交换机时通过Headers头部来将消息映射到队列的,有点像HTTP的

RabbitMQ入门教程(十):队列声明queueDeclare

原文:RabbitMQ入门教程(十):队列声明queueDeclare 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78670550 分享一个朋友的人工智能教程(请以"右键"->"在新标签页中打开连接"的方式访问).比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要讨论队列声明的各个参