Spring Boot(十三)RabbitMQ安装与集成

一、前言

RabbitMQ是一个开源的消息代理软件(面向消息的中间件),它的核心作用就是创建消息队列,异步接收和发送消息,MQ的全程是:Message Queue中文的意思是消息队列。

## 1.1 使用场景
- 削峰填谷:用于应对间歇性流量提升对于系统的“破坏”,比如秒杀活动,可以把请求先发送到消息队列在平滑的交由系统去处理,当访问量大于一定数量的时候,还可以直接屏蔽后续操作,给前台的用户友好的显示;
- 延迟处理:可以进行事件后置,比如订单超时业务,用户下单30分钟未支付取消订单;
- 系统解耦:消息队列也可以帮开发人员完成业务的解耦,比如用户上传头像的功能,最初的设计是用户上传完之后才能发帖,后面有增加了经验系统,需要在上传头像之后增加经验值,到后来又上线了金币系统,上传头像之后可以增加金币,像这种需求的不断升级,如果在业务代码里面写死每次该业务代码是很不优雅的,这个时候如果使用消息队列,那么只需要增加一个订阅器用于介绍用户上传头像的消息,再执行经验的增加和金币的增加是非常简单的,并且在不改动业务模块业务代码的基础上可以轻松实现,如果后期需要撤销某个模块了,只需要删除订阅器即可,就这样就降低了系统开发的耦合性;
## 1.2 为什么使用RabbitMQ?
现在市面上比较主流的消息队列还有Kafka、RocketMQ、RabbitMQ,它们的介绍和区别如下:
- Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
- RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
- RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
**简单总结:** Kafka的性能最好,适用于对消息吞吐量达,对消息丢失不敏感的系统;RocketMQ借鉴了Kafka并提高了消息的可靠性,修复了Kafka的不足;RabbitMQ性能略低于Kafka,并实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的标准,有非常好的稳定性。
**支持语言对比**
- RocketMQ 支持语言:Java、C++、Golang
- Kafka 支持语言:Java、Scala
- RabbitMQ 支持语言:C#、Java、Js/NodeJs、Python、Ruby、Erlang、Perl、Clojure、Golang
## 1.3 RabbitMQ特点
RabbitMQ的特点是易用、扩展性好(集群访问)、高可用,具体如下:
- 可靠性:持久化、消息确认、事务等保证了消息的可靠性;
- 伸缩性:集群服务,可以很方便的添加服务器来提高系统的负载;
- 高可用:集群状态下部分节点出现问题依然可以运行;
- 多语言支持:RabbitMQ几乎支持了所有的语言,比如Java、.Net、Nodejs、Golang等;
- 易用的管理页面:RabbitMQ提供了易用了网页版的管理监控系统,可以很方便的完成RabbitMQ的控制和查看;
- 插件机制:RabbitMQ提供了许多插件,可以丰富和扩展Rabbit的功能,用户也可编写自己的插件;
## 1.4 RabbitMQ基础知识
在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
### (一)消息发送原理
首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?
你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。
信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者接收消息都是通过信道完成的。
### (二)为什么不通过TCP直接发送命令?
对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。
如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。
![](http://icdn.apigo.cn/blog/rabbit_channel.png)
### (三)RabbitMQ名称解释
**ConnectionFactory(连接管理器):** 应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
**Channel(信道):** 消息推送使用的通道;
**Exchange(交换器):** 用于接受、分配消息;
**Queue(队列):** 用于存储生产者的消息;
**RoutingKey(路由键):** 用于把生成者的数据分配到交换器上;
**BindingKey(绑定键):** 用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:
![](http://icdn.apigo.cn/blog/rabbit-producer.gif)
## 1.5 交换器分类
RabbitMQ的Exchange(交换器)分为四类:
- direct(默认)
- headers
- fanout
- topic
其中headers交换器允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到,所以我们这里不做解释。
### 1.5.1 direct交换器
direct为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列,如下图:
![](http://icdn.apigo.cn/blog/rabbitmq-direct.png?imageView2/0/w/300/h/250)
### 1.5.2 fanout交换器
fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
**注意:** 对于fanout交换器来说routingKey(路由键)是无效的,这个参数是被忽略的。
### 1.5.3 topic交换器
topic交换器运行和fanout类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候routingKey路由键就排上用场了,使用路由键进行消息(规则)匹配。
topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”作为分隔符,例如:com.mq.rabbit.error。
**匹配规则**
匹配表达式可以用“*”和“#”匹配任何字符,具体规则如下:
- “*”匹配一个分段(用“.”分割)的内容;
- “#”匹配所有字符;
例如发布了一个“cn.mq.rabbit.error”的消息:
能匹配上的路由键:
- `cn.mq.rabbit.*`
- `cn.mq.rabbit.#`
- `#.error`
- `cn.mq.#`
- `#`
不能匹配上的路由键:
- `cn.mq.*`
- `*.error`
- `*`
## 1.6 消息持久化
RabbitMQ队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。
当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:
1. 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
2. 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
3. 消息已经到达持久化交换器上;
4. 消息已经到达持久化的队列;
**持久化工作原理**
Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。
**持久化的缺点**
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
所以使用者要根据自己的情况,选择适合自己的方式。
学习更多RabbitMQ知识,访问:https://gitbook.cn/gitchat/activity/5b558d54c28306099b47ae9c
# 二、在Docker中安装RabbitMQ
**(1)下载镜像**
https://hub.docker.com/r/library/rabbitmq/tags/
- alpine 轻量版
- management 带插件的版本
从镜像的大小也可以很直观的看出来alpine是轻量版。
使用命令:
> docker pull rabbitmq:3.7.7-management
下载带management插件的版本。
**(2)运行RabbitMQ**
使用命令:
> docker run -d --hostname myrabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management
- -d 后台运行
- --hostname 主机名称
- --name 容器名称
- -p 15672:15672 http访问端口,映射本地端口到容器端口
- -p 5672:5672 amqp端口,映射本地端口到容器端口
正常启动之后,访问:http://localhost:15672/
登录网页管理页面,用户名密码:guest/guest,登录成功如下图:
![](http://icdn.apigo.cn/blog/springboot-rabbitmq-1.png)
# 三、RabbitMQ集成
## 3.1 添加依赖
如果用Idea创建新项目,可以直接在创建Spring Boot的时候,点击“Integration”面板,选择RabbitMQ集成,如下图:
![](http://icdn.apigo.cn/blog/springboot-rabbitmq-2.png?imageView2/0/w/500/h/400)
如果是老Maven项目,直接在pom.xml添加如下代码:
```xml

org.springframework.boot
spring-boot-starter-amqp

```
## 3.2 配置RabbitMQ信息
在application.properties设置如下信息:
```
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
```
3.3 代码
## 3.3 代码实现
本节分别来看三种交换器:direct、fanout、topic的实现代码。
### 3.3.1 Direct Exchange
#### 3.3.1.1 配置队列
创建DirectConfig.java代码如下:
```java
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
final static String QUEUE_NAME = "direct"; //队列名称
final static String EXCHANGE_NAME = "mydirect"; //交换器名称
@Bean
public Queue queue() {
// 声明队列 参数一:队列名称;参数二:是否持久化
return new Queue(DirectConfig.QUEUE_NAME, false);
}
// 配置默认的交换机,以下部分都可以不配置,不设置使用默认交换器(AMQP default)
@Bean
DirectExchange directExchange() {
// 参数一:交换器名称;参数二:是否持久化;参数三:是否自动删除消息
return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false);
}
// 绑定“direct”队列到上面配置的“mydirect”路由器
@Bean
Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME);
}
}
```
#### 3.3.1.2 发送消息
创建Sender.java代码如下:
```java
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消息发送者-生产消息
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void driectSend(String message) {
System.out.println("Direct 发送消息:" + message);
//参数一:交换器名称,可以省略(省略存储到AMQP default交换器);参数二:路由键名称(direct模式下路由键=队列名称);参数三:存储消息
this.rabbitTemplate.convertAndSend("direct", message);
}
}
```
**注意:**
- 在direct交换器中,路由键名称就是队列的名称;
- 发送消息“convertAndSend”的时候,第一个参数为交换器的名称,非必填可以忽略,如果忽略则会把消息发送到默认交换器“AMQP default”;
#### 3.3.1.3 消费消息
创建Receiver.java代码如下:
```
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 消息接收者-消费消息
*/
@Component
@RabbitListener(queues = "direct")
public class Receiver {
@Autowired
private AmqpTemplate rabbitTemplate;
@RabbitHandler
/**
* 监听消费消息
*/
public void process(String message) {
System.out.println("Direct 消费消息:" + message);
}
}
```
#### 3.3.1.4 测试代码
使用Spring Boot中的默认测试框架JUnit进行单元测试,不了解JUnit的可以参考我的上一篇文章,创建MQTest.java代码如下:
```java
package com.example.rabbitmq.mq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
@Autowired
private Sender sender;
@Test
public void driectTest() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.driectSend("Driect Data:" + sf.format(new Date()));
}
}
```
执行之后,效果如下图:
![](http://icdn.apigo.cn/blog/springboot-rabbitmq-3.png)
表示消息已经被发送并被消费了。
### 3.3.2 Fanout Exchange
#### 3.3.2.1 配置队列
创建FanoutConfig.java代码如下:
```java
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
final static String QUEUE_NAME = "fanout"; //队列名称
final static String QUEUE_NAME2 = "fanout2"; //队列名称
final static String EXCHANGE_NAME = "myfanout"; //交换器名称
@Bean
public Queue queueFanout() {
return new Queue(FanoutConfig.QUEUE_NAME);
}
@Bean
public Queue queueFanout2() {
return new Queue(FanoutConfig.QUEUE_NAME2);
}
//配置交换器
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout).to(fanoutExchange);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
}
}
```
#### 3.3.2.2 发送消息
创建FanoutSender.java代码如下:
```java
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message) {
System.out.println("发送消息:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message);
}
public void send2(String message) {
System.out.println("发送消息2:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message);
}
}
```
#### 3.3.2.3 消费消息
创建两个监听类,第一个FanoutReceiver.java代码如下:
```java
package com.example.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "fanout")
public class FanoutReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Fanout(FanoutReceiver)消费消息:" + msg);
}
}
```
第二个FanoutReceiver2.java代码如下:
```java
package com.example.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout2")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Fanout(FanoutReceiver2)消费消息:" + message);
}
}
```
#### 3.3.2.4 测试代码
创建FanoutTest.java代码如下:
```java
package com.example.rabbitmq.mq;
import com.example.rabbitmq.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Time1 => " + sf.format(new Date()));
sender.send2("Date2 => " + sf.format(new Date()));
}
}
```
运行测试代码,输出结果如下:
```
发送消息:Time1 => 2018-09-11
发送消息2:Date2 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Date2 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Date2 => 2018-09-11
```
**总结:** 可以看出fanout会把消息分发到所有订阅到该交换器的队列,fanout模式是忽略路由键的。
### 3.3.3 Topic Exchange
#### 3.3.3.1 配置队列
```java
@Configuration
public class TopicConfig {
final static String QUEUE_NAME = "log";
final static String QUEUE_NAME2 = "log.all";
final static String QUEUE_NAME3 = "log.all.error";
final static String EXCHANGE_NAME = "topicExchange"; //交换器名称
@Bean
public Queue queuetopic() {
return new Queue(TopicConfig.QUEUE_NAME);
}
@Bean
public Queue queuetopic2() {
return new Queue(TopicConfig.QUEUE_NAME2);
}
@Bean
public Queue queuetopic3() {
return new Queue(TopicConfig.QUEUE_NAME3);
}
// 配置交换器
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TopicConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器,并设置路由键(log.#)
@Bean
Binding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic).to(topicExchange).with("log.#");
}
// 绑定队列到交换器,并设置路由键(log.*)
@Bean
Binding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic2).to(topicExchange).with("log.*");
}
// 绑定队列到交换器,并设置路由键(log.*.error)
@Bean
Binding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic3).to(topicExchange).with("log.*.error");
}
}
```
#### 3.3.3.2 发布消息
```java
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void topicSender(String message) {
String routingKey = "log.all.error";
System.out.println(routingKey + " 发送消息:" + message);
this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message);
}
}
```
#### 3.3.3.3 消费消息
```java
@Component
@RabbitListener(queues = "log")
public class TopicReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("log.# 消费消息:" + msg);
}
}
```
```java
@Component
@RabbitListener(queues = "log.all")
public class TopicReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println("log.* 消费消息:" + msg);
}
}
```
```java
@Component
@RabbitListener(queues = "log.all.error")
public class TopicReceiver3 {
@RabbitHandler
public void process(String msg) {
System.out.println("log.*.error 消费消息:" + msg);
}
}
```
#### 3.3.3.4 测试代码
```java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void Test() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
fanoutSender.send("Time1 => " + sf.format(new Date()));
fanoutSender.send2("Date2 => " + sf.format(new Date()));
}
}
```
输出结果:
```
log.all.error 发送消息:time => 2018-09-11
log.# 消费消息:time => 2018-09-11
log.*.error 消费消息:time => 2018-09-11
```
**总结:** 在Topic Exchange中“#”可以匹配所有内容,而“*”则是匹配一个字符段的内容。
以上示例代码Github地址:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
**参考文档**
阿里 RocketMQ 优势对比:https://juejin.im/entry/5a0abfb5f265da43062a4a91

原文地址:http://blog.51cto.com/2188001/2316141

时间: 2024-08-02 10:43:58

Spring Boot(十三)RabbitMQ安装与集成的相关文章

Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言 RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷.消息分发的作用. 消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的.在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ . 当然,我们本篇文章的主角还是 Ra

Spring Boot之RabbitMQ

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式

Spring Boot 2.0 图文教程 | 集成邮件发送功能

文章首发自个人微信公众号: 小哈学Java 个人网站: https://www.exception.site/springboot/spring-boots-send-mail 大家好,后续会间断地奉上一些 Spring Boot 2.x 相关的博文,包括 Spring Boot 2.x 教程和 Spring Boot 2.x 新特性教程相关,如 WebFlux 等.还有自定义 Starter 组件的进阶教程,比如:如何封装一个自定义图床 Starter 启动器(支持上传到服务器内部,阿里 OS

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持 一丶前言 在生产中,存在一些场景,需要对数据进行批量操作.如,可以先将数据存放到redis,然后将数据进行批量写进数据库.但是使用redis,不得不面对一个数据容易丢失的问题.也可以考虑使用消息队列进行替换,在数据持久化,数据不丢失方面,消息队列确实比redis好一点,毕竟设计不一样.是不是使用消息队列,就一定好呢?不是的,首先使用消息队列,不能确保数据百分百不丢失,(如果要做到百分百不丢失,设计上就会比较复杂),除此之

Spring Boot MyBatis 通用Mapper插件集成

看本文之前,请确保你已经在SpringBoot中集成MyBatis,并能正常使用.如果没有,那么请先移步 http://blog.csdn.net/catoop/article/details/50553714 做了解后,再按本文步骤操作. 使用MyBatis在我们通过xml集中配置SQL,并通过创建接口Mapper文件来完成持久化DAO层(mybatis内部使用的是动态代理,所以我们不需要自己编写实现类). 然而在实际开发中,单表操作非常多,如果你也想像JPA.JDBC那样做一个所谓的Base

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

本文主要摘录自:详细介绍Spring Boot + RabbitMQ实现延迟队列 并增加了自己的一些理解,记录下来,以便日后查阅. 项目源码: spring-boot-rabbitmq-delay-queue 实现 stream-rabbitmq-delay-queue 实现 背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟队列能做什么?延迟队列多用于需要延迟工作的场景.最常见的是以下两种场景: 延迟消费

Spring Boot (25) RabbitMQ消息队列

MQ全程(Message Queue)又名消息队列,是一种异步通讯的中间件.可以理解为邮局,发送者将消息投递到邮局,然后邮局帮我们发送给具体的接收者,具体发送过程和时间与我们无关,常见的MQ又kafka.activemq.zeromq.rabbitmq等等. RabbitMQ RabbitMQ是一个遵循AMQP协议,由面向高并发的erlang语言开发而成,用在实时的对可靠性要求比较高的消息传递上,支持多种语言客户端,支持延迟队列. 基础概念 Broker:消息队列的服务器实体 Exchange:

spring boot +mybatis(通过properties配置) 集成

注:日常学习记录贴,下面描述的有误解的话请指出,大家一同学习. 因为我公司现在用的是postgresql数据库,所以我也用postgresql进行测试 一.前言 1.Spring boot 会默认读取src/main/resource路径下的application.properties(或者application.yml)文件的内容,一般自定义的配置文件也位于此目录之下. 2配置文件会自动加载,意思就是将文件读取到Spring容器之中,更确切的说就是将各个配置项装载到Spring上下文容器之中供

spring boot整合RabbitMQ(Fanout模式)

1.Fanout Exchange介绍Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).Q