RabbitMQ (消息队列)专题学习02 Hello World

一、概述

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。

在专题学习一中我们已经简单提到了一些概念,在此我们更为深入的学习下RabbitMQ相关的专有名词。

1、生产(Producing):意思就是发送,发送消息的程序就是一个生产者(Producer),我们一般使用P来标示,如下图-1所示:

图-1

2、队列(queue)就是邮箱的名称,消息通过你的应用程序和RabbitMQ服务器进行传递,它们能够存储在队列(queue)中,队列(queue)没有任何限制,你要存储多少消息都可以---queue基本上是一个无限的缓冲区,而且多个生产者(Producers)能够把消息发送给同一个队列,同样多个消费者(consumer)也能够从同一个队列(queue)中获取数据,如下图-2所示表示的就是一个队列:

图-2

3、消费(consuming),它和获取消息是一个意思,一个消费者(sonsumer)就是一个等待获取消息的程序,我们通常用C来表示,如下图-3所示:

图-3

二、实现Hello World

由于RabbitMQ支持多种语言,诸如Java、Python、Ruby、PHP、C#等等,实现了对每种语言的Client接口,RabbitMQ自己充当服务器,所以在专题一的架构中可以看出,不管发送消息的生产者和接收消息的消费者使用何种语言,都充当客户端的角色,故在此我们使用Java语言的客户端(消息发送者和消息接受者)。

在这部分学习中我们将实现两个Java程序:一个是发送单个消息的生产者,一个是接收消息并在控制台打印消息的消费者,我们将忽略一些JavaAPI的细节,把所有的精力都放在将要开始实现生产者和消费者上---它就是一个关于Hello World的消息队列的实现。

在下图-4中,P是消息的生产者,C是消息的消费者(接收者),中间部分就是队列(queue)---rabbitMQ用来保存消息的缓冲区域。

图-4

大致过程:生产者(Producer)把消息发送到一个名为”hello“的队列中,消费者(Consumer)从这个队列中获取消息。

注意:在编程的时候要让客户端和服务器的AMQP的版本一致,不一致会报错,如果不一致主要去找对应版本的客户端即可,这样就不用再费劲去重装服务器端了。

2.1、发送消息

图-5

下面实现调用消息发送者发送消息和消息接收者接收消息,发送者首先将建立与RabbitMQ的连接,发送一条消息,然后退出。

Send.java中,我们首先需要导入如下类文件:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

发送消息的过程如下:

1、创建连接(Connection)

2、创建通道(Channel),声明队列(当队列存在时就获取队列,不存在时就创建队列)。

3、发送消息

完成的发送者代码清单如下:

package com.xuz.send;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender01 {
	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		//RabbitMQ-Server安装在本机,所以直接用127.0.0.1
		factory.setHost("127.0.0.1");
		//创建一个连接
		Connection conn = factory.newConnection();
		//创建一个通信通道
		Channel channel = conn.createChannel();
		//定义Queue名称
		String queueName = "hello";
		//为Channel定义queue的属性,queueName为queue名称
		channel.queueDeclare(queueName, false, false,false,null);
		String msg = "Hello World!xuzheng test!";
		//发送消息
		channel.basicPublish("", queueName, null, msg.getBytes());
		System.out.println("send message["+msg+"] to "+queueName+" success!");
		//关闭通道
		channel.close();
		//关闭连接
		conn.close();
	}
}

2.2、接收消息

图-6

接收消息的过程如下:

1、创建连接

2、创建通道、声明队列(还得在声明一次,原因在后面解释)

3、创建QueueConsumer用于缓存消息

4、循环接收消息(无消息阻塞,有消息处理)。

注意:声明队列的机制是:当队列存在时就获取队列,不存在时就创建队列,接收者可能比发送者更早创建,这时它需要保证存在一个队列,它能够从中取出消息。

完整接收者的代码如下:

package com.xuz.recv;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Recv01 {
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		String queueName = "hello";
		channel.queueDeclare(queueName, false, false, false, null);
		//以上部分和sender一样
		//配置好获取消息得方式
		QueueingConsumer consumer =  new QueueingConsumer(channel);
		channel.basicConsume(queueName, true,consumer);
		//循环获取消息
		while(true){
			//获取消息,如果没有消息,这一步将会一直阻塞
			Delivery delivery = consumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.out.println("received message["+msg+"] from "+queueName);
		}
	}
}

说明:在上述代码中我们还是要连接RabbitMQ服务器,连接代码和发送端代码是一样的,创建通道也是一样的,我们需要确认队列是否存在,使用queue_declare创建一个队列,我们可以运行这个命令很多次,但是只有一个队列会被创建。

channel.queueDeclare(queueName, false, false, false, null);

你也许要问为什么重复声明了队列---我们已经在前面的代码中声明了它,如果我们确定了队列是否已经存在的,那么我们可以不这么做,比如先运行Sender01.java程序,可是我们并不确定哪个程序先运行,这种情况的话在程序中重复声明是好的做法。

factory.setHost("127.0.0.1");

表示我们的发送端和接收端都在安装RabbitMQ服务器的本机运行。

运行结果如下:

发送端:

接收端:

至此RabbitMQ入门程序HelloWorld消息队列实现完毕!

源码下载:

RabbitMQ之HelloWorld源码

RabbitMQ (消息队列)专题学习02 Hello World

时间: 2024-10-19 13:07:59

RabbitMQ (消息队列)专题学习02 Hello World的相关文章

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基

RabbitMQ 消息队列 应用

安装参考    详细介绍   学习参考 RabbitMQ 消息队列 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.

Nginx通过LUA脚本访问RabbitMQ消息队列

发现了一个Nginx的LUA脚本:lua-resty-rabbitmqstomp,可以让Nginx通过LUA脚本访问RabbitMQ消息队列,这个脚本是通过stomp协议连接RabbitMQ的stomp适配器,来pub/sub消息的 关于RabbitMQ-STOMP安装使用相关内容可以参见:RabbitMQ STOMP Adapter 关于Nginx-LUA模块安装使用参见:LAMP架构演进到LAMPGC,再演进到LNMLGC 关于STOMP协议相关资料参见这里: STOMP官方英文协议1.1版

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message