RabbitMQ-从基础到实战(1)— Hello RabbitMQ

转自:https://yq.aliyun.com/articles/589923

1.简介

本篇博文介绍了在windows平台下安装RabbitMQ Server端,并用JAVA代码实现收发消息

2.安装RabbitMQ

  1. RabbitMQ是用Erlang开发的,所以需要先安装Erlang环境,在这里下载对应系统的Erlang安装包进行安装
  2. 点击这里下载对应平台的RabbitMQ安装包进行安装

Windows平台安装完成后如图

3.启用RabbitMQ Web控制台

RabbitMQ提供一个控制台,用于管理和监控RabbitMQ,默认是不启动的,需要运行以下命令进行启动

  1. 点击上图的Rabbit Command Prompt,打开rabbitMQ控制台
  2. 官方介绍管理控制台的页面,可以看到,输入以下命令启动后台控制插件

    rabbitmq-plugins enable rabbitmq_management

  3. 登录后台页面:http://localhost:15672/   密码和用户名都是 guest ,界面如下

目前可以先不用理会此界面,后面使用到时会详细介绍,也可以到这里查看官方文档。

4.编写MessageSender

Spring对RabbitMQ已经进行了封装,正常使用中,会使用Spring集成,第一个项目中,我们先不考虑那么多

在IDE中新建一个Maven项目,并在pom.xml中贴入如下依赖,RabbitMQ的最新版本依赖可以在这里找到

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

等待Maven下载完成后,就可以在Maven Dependencies中看到RabbitMQ的JAR

在这里,我们发现,RabbitMQ的日志依赖了slf4j-api这个包,slf4j-api并不是一个日志实现,这样子是打不出日志的,所以,我们给pom加上一个日志实现,这里用了logback

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.1</version>
</dependency>

之后maven依赖如下,可以放心写代码了

新建一个MessageSender类,代码如下

 1 import java.io.IOException;
 2 import java.util.concurrent.TimeoutException;
 3
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10
11 public class MessageSender {
12
13     private Logger logger = LoggerFactory.getLogger(MessageSender.class);
14
15     //声明一个队列名字
16     private final static String QUEUE_NAME = "hello";
17
18     public boolean sendMessage(String message){
19         //new一个RabbitMQ的连接工厂
20         ConnectionFactory factory = new ConnectionFactory();
21         //设置需要连接的RabbitMQ地址,这里指向本机
22         factory.setHost("127.0.0.1");
23         Connection connection = null;
24         Channel channel = null;
25         try {
26             //尝试获取一个连接
27             connection = factory.newConnection();
28             //尝试创建一个channel
29             channel = connection.createChannel();
30             //这里的参数在后面详解
31             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32             //注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String
33             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
34             logger.info("Sent ‘" + message + "‘");
35             //关闭channel和连接
36             channel.close();
37             connection.close();
38         } catch (IOException | TimeoutException e) {
39             //失败后记录日志,返回false,代表发送失败
40             logger.error("send message faild!",e);
41             return false;
42         }
43         return true;
44     }
45 }

然后在App类的main方法中调用sendMessage

1 public class App {
2     public static void main( String[] args ){
3         MessageSender sender = new MessageSender();
4         sender.sendMessage("hello RabbitMQ!");
5     }
6 }

打印日志如下

打开RabbitMQ的控制台,可以看到消息已经进到了RabbitMQ中

点进去,用控制台自带的getMessage功能,可以看到消息已经成功由RabbitMQ管理了

至此,MessageSender已经写好了,在该类的31和33行,我们分别调用了队列声明和消息发送

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

queueDeclare,有很多参数,我们可以看一下他的源码,注释上有详细的解释,我简单翻译了一下

 1 /**
 2      * Declare a queue 声明一个队列
 3      * @see com.rabbitmq.client.AMQP.Queue.Declare
 4      * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 5      * @param queue the name of the queue队列的名字
 6      * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,为true则在rabbitMQ重启后生存
 7      * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除)
 8      * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自动删除,在最后一个连接断开后删除队列
 9      * @param arguments other properties (construction arguments) for the queue 其他参数
10      * @return a declaration-confirm method to indicate the queue was successfully declared
11      * @throws java.io.IOException if an error is encountered
12      */
13     Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
14                                  Map<String, Object> arguments) throws IOException;

前面4个都非常好理解,最后一个“其他参数”,到底是什么其他参数,这个东西真的很难找,用到再解释吧,官方文档如下

  • TTL Time To Live  存活时间

basicPublish的翻译如下

 1  /**
 2      * Publish a message.发送一条消息
 3      *
 4      * Publishing to a non-existent exchange will result in a channel-level
 5      * protocol exception, which closes the channel.
 6      *
 7      * Invocations of <code>Channel#basicPublish</code> will eventually block if a
 8      * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
 9      *
10      * @see com.rabbitmq.client.AMQP.Basic.Publish
11      * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
12      * @param exchange the exchange to publish the message to 交换模式,会在后面讲,官方文档在这里13      * @param routingKey the routing key 控制消息发送到哪个队列
14      * @param props other properties for the message - routing headers etc 其他参数
15      * @param body the message body 消息,byte数组
16      * @throws java.io.IOException if an error is encountered
17      */
18     void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

这里又有个其他参数,它的类型是这样的,设置消息的一些详细属性

5.编写MessageConsumer

为了和Sender区分开,新建一个Maven项目MessageConsumer

 1 package com.liyang.ticktock.rabbitmq;
 2
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16
17 public class MessageConsumer {
18
19     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
20
21     public boolean consume(String queueName){
22         //连接RabbitMQ
23         ConnectionFactory factory = new ConnectionFactory();
24         factory.setHost("127.0.0.1");
25         Connection connection = null;
26         Channel channel = null;
27         try {
28             connection = factory.newConnection();
29             channel = connection.createChannel();
30             //这里声明queue是为了取消息的时候,queue肯定会存在
31             //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue
32             channel.queueDeclare(queueName, false, false, false, null);
33
34             //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
35             Consumer consumer = new DefaultConsumer(channel){
36                 @Override
37                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
38                           throws IOException {
39                         String message = new String(body, "UTF-8");
40                         logger.info("Received ‘" + message + "‘");
41                 }
42             };
43             //上面是声明消费者,这里用声明的消费者消费掉队列中的消息
44             channel.basicConsume(queueName, true, consumer);
45
46             //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
47
48         } catch (IOException | TimeoutException e) {
49             //失败后记录日志,返回false,代表消费失败
50             logger.error("send message faild!",e);
51             return false;
52         }
53
54
55         return true;
56     }
57 }

然后在App的main方法中调用Cunsumer进行消费

 1 public class App
 2 {
 3     //这个队列名字要和生产者中的名字一样,否则找不到队列
 4     private final static String QUEUE_NAME = "hello";
 5
 6     public static void main( String[] args )
 7     {
 8         MessageConsumer consumer = new MessageConsumer();
 9         consumer.consume(QUEUE_NAME);
10     }
11 }

结果如下,消费者一直在等待消息,每次有消息进来,就会立刻消费掉

6.多个消费者同时消费一个队列

改造一下Consumer

在App中new多个消费者

改造Sender,使它不停的往RabbitMQ中发送消息

启动Sender

启动Consumer,发现消息很平均的发给四个客户端,一人一个,谁也不插队

如果我们把速度加快呢?把Sender的休息时间去掉,发现消费开始变得没有规律了,其实呢,它还是有规律的,这个是RabbitMQ的特性,称作“Round-robin dispatching”,消息会平均的发送给每一个消费者,可以看第一第二行,消息分别是56981和56985,相应的82、82、84都被分给了其他线程,只是在当前线程的时间片内,可以处理这么多任务,所以就一次打印出来了

原文地址:https://www.cnblogs.com/sharpest/p/10425250.html

时间: 2024-10-13 03:36:01

RabbitMQ-从基础到实战(1)— Hello RabbitMQ的相关文章

C# RabbitMQ延迟队列功能实战项目演练

一.需求背景 当用户在商城上进行下单支付,我们假设如果8小时没有进行支付,那么就后台自动对该笔交易的状态修改为订单关闭取消,同时给用户发送一份邮件提醒.那么我们应用程序如何实现这样的需求场景呢?在之前的<C# Redis缓存过期实现延迟通知实战演练>分享课程中阿笨最后总结的时候说过Redis Pub/Sub是一种并不可靠地消息机制,他不会做信息的存储,只是在线转发,那么肯定也没有ack确认机制,另外只有订阅段监听时才会转发!我们是否有更好的方式去实现呢?今天给大家分享的比较好的解决方案就是通过

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送

摘要: 本篇博文是“Java秒杀系统实战系列文章”的第八篇,在这篇文章中我们将整合消息中间件RabbitMQ,包括添加依赖.加入配置信息以及自定义注入相关操作组件,比如RabbitTemplate等等,最终初步实现消息的发送和接收,并在下一篇章将其与邮件服务整合,实现“用户秒杀成功发送邮件通知消息”的功能! 内容: 对于消息中间件RabbitMQ,想必各位小伙伴没有用过.也该有听过,它是一款目前市面上应用相当广泛的消息中间件,可以实现消息异步通信.业务服务模块解耦.接口限流.消息分发等功能,在微

RabbitMQ实例教程:Windows下安装RabbitMQ

(1)下载RabbitMQ服务器 从RabbitMQ官网下载最新的稳定版.目前最新版本为3.5.1. (2)移除RabbitMQ老版本. 如果之前安装了老版本的话,或者想要将Erlang VM从32位升级到64位,需要手动卸载RabbitMQ服务器.因为安装过程中并不会停止或移除旧的服务. (3)安装RabbitMQ服务器 从Erlang官网下载Windows安装文件,并安装.RabbitMQ需要这个东西. 运行rabbitmq-server-3.5.1.exe,安装RabbitMQ并使用默认配

《web 前端基础到实战系列课程》

<web 前端基础到实战系列课程> 摘要: mod_expires&mod_headers可以减少10%左右的重复请求,让重复的用户对指定的页面请求结果都CACHE在本地,根本不向服务器发出请求. 在使用之前,首先要确认一下"mod_expires"模组是否有启用.如果是自己安装Apache来架设网页主机的话,这里我们可以透过编辑Apache的"httpd.conf"设定档来处理 一.浏览器缓存原理 将该行前面的"#"字号删除

零基础Swift实战开发视频教程_从入门到精通

零基础Swift实战开发从入门到精通(4大项目实战.酷跑熊猫.百度音乐.足球游戏等)适合人群:初级课时数量:50课时更新程度:86%用到技术:Swift涉及项目:酷跑熊猫.百度音乐.足球游戏咨询qq:1840215592零基础Swift实战开发视频教程采用基础+项目的方式进行讲解,通过基础的学习,可以完全掌握Swift基本语法应用,并结合4个项目进行开发,学员能在最短的时间内掌握开发的各项技能. 零基础Swift实战开发从入门到精通详细查看:http://www.ibeifeng.com/goo

Docker基础入门实战(一)

Docker基础入门实战 第1章          docker简介 1.1  what is Docker Docker是一个开源的应用容器引擎,基于Go语言并遵从Apache2.0协议开源,源代码部署在GitHub上. Docker是通过内核虚拟技术来提供容器的资源隔离与安全保障,由于Docker通过操作系统层的虚拟化实现隔离,所以Docker容器在运行时,不需要类似虚拟机(VM)额外的操作系统开销,从而提高资源的利用率. Docker的目标是实现轻量级的操作系统虚拟化解决方案. 1.2 

ajax从零基础到实战

一. 什么是AJAX? ajax是一种在无需重新加载整个网页的情况下,能够更新部分网页的技术. 二. 在项目中怎么运用AJAX? 项目主要文件夹目录有img文件夹,css文件夹,js文件夹,如果你要运用到ajax,那么你可以在js里面建立一个js文件存放ajax代码,在相应html页面引入这个js文件即可. 三. AJAX的结构怎么写? //ajax的结构 $(function(){ $.ajax({ url:"https://www.xxxx.com/shop/xxxxxxx/",

《Python与量化投资从基础到实战》PDF及代码+《量化投资以Python为工具》PDF及代码

下载:https://pan.baidu.com/s/1NU_53IT-ZPhwACk6sJphAw 更多资料:https://pan.baidu.com/s/1bl6Q4Ex2_TC242IMnGQPRw <Python与量化投资:从基础到实战(王小川)>PDF,562页,带书签目录,文字可以复制:配套源代码.<量化投资以Python为工具(蔡立耑 )>PDF,550页:配套源代码. <Python与量化投资:从基础到实战>主要讲解如何利用Python进行量化投资,包