springboot rabbitmq整合

这一篇我们来把消息中间件整合到springboot中

=====================================================================

首先在服务器上安装rabbitmq的服务,用docker拉取即可,不再详细描述。

直接来撸代码

首先我们先添加rabbitmq的依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加必要的配置信息

spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

好了,基本的配置就已经配置完毕了

rabbitmq有六种模式

我们逐个来看springboot是怎么实现的呢

1.hello world

P代表生产者,C代表消费者,红色代码消息队列。P将消息发送到消息队列,C对消息进行处理。

我们先创建一个队列

@Bean
    public Queue Queue() {
        return new Queue("hello");
    }

然后我再创建一个生产者

@Controller
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

再创建一个消费者

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}

再写一个测试用例看看

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
        @Autowired
        private HelloSender helloSender;

        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
}

成功!

2.工作模式(竞争)

一个消息产生者,多个消息的消费者。竞争抢消息

我们先创建一个队列

@Bean
    public Queue Queue2() {
        return new Queue("neo");
    }

再创建一个消息生产者

@Controller
public class NeoSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(int i) {
        String context = "spirng boot neo queue"+" ****** "+i;
        System.out.println("Sender1 : " + context);
        this.rabbitTemplate.convertAndSend("neo", context);
    }
}

再创建两个消息的消费者

 1 @Component
 2 @RabbitListener(queues = "neo")
 3 public class NeoReceiver1 {
 4     @RabbitHandler
 5     public void process(String neo) {
 6         System.out.println("Receiver 1: " + neo);
 7     }
 8 }
 9
10
11
12 @Component
13 @RabbitListener(queues = "neo")
14 public class NeoReceiver2 {
15     @RabbitHandler
16     public void process(String neo) {
17         System.out.println("Receiver 2: " + neo);
18     }
19
20 }

我们写一个测试用例

@Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
           // Thread.sleep(10);
            neoSender.send(i);
        }
    }

运行

可以看到消息均匀的被两个消费者消费了。

通过这个例子我们可以看做高并发情况下的消息产生和消费,这会产生一个消息丢失的问题。万一客户端在处理消息的时候挂了,那这条消息就相当于被浪费了,针对这种情况,rabbitmq推出了消息ack机制,熟悉tcp三次握手的一定不会陌生。

我们看看springboot是实现ack的

很简单,在我们的配置类中,配置一个新的消费者,将原先的消费者先都去掉:

@Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(Queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息确认后才能删除
        container.setPrefetchCount(5);//每次处理5条消息
        container.setMessageListener(new ChannelAwareMessageListener() {

            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("消费端接收到消息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

但这里会有个问题,test模式下消息发送完毕系统就会直接shutdown,所以只能消费部分消息,不过等真正启动项目,这个问题就不存在了。

3.发布订阅模式

生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,来消费消息。

这种方式实现同一个消息被多个消费者消费。工作模式是同一个消息只能有一个消费者。

我们新建三个队列

@Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

再新建一个交换机

@Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

再把这些队列绑定到交换机上去

@Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

基本的配置完成后,再新建一个消息生产者

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }
}

同样的,我们再新建三个消息消费者

 1 @Component
 2 @RabbitListener(queues = "fanout.A")
 3 public class FanoutReceiveA {
 4
 5     @RabbitHandler
 6     public void process(String message) {
 7         System.out.println("fanout Receiver A  : " + message);
 8     }
 9 }
10
11 @Component
12 @RabbitListener(queues = "fanout.B")
13 public class FanoutReceiverB {
14     @RabbitHandler
15     public void process(String message) {
16         System.out.println("fanout Receiver B: " + message);
17     }
18 }
19
20 @Component
21 @RabbitListener(queues = "fanout.C")
22 public class FanoutReceiverC {
23     @RabbitHandler
24     public void process(String message) {
25         System.out.println("fanout Receiver C: " + message);
26     }
27 }

三个消费者分别监听3个队列的内容

新建一个测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void setFanoutSender(){
        fanoutSender.send();
    }

}

三个队列都接受到了消息

4:路由模式

需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配,这是一个完整的匹配。

5.主题模式

发送端不只按固定的routing key发送消息,而是按字符串匹配发送,接收端同样如此

符号#匹配一个或多个词,符号*匹配不多不少一个词。

4/5两者模式很相似,我们放在一起演示

新建两个队列

final static String message = "topic.A";
    final static String messages = "topic.B";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

新建一个交换机

@Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

绑定队列到交换机上,路由模式,需要完整匹配topic.message,才能接受

@Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

topic模式,前缀匹配到topic.即可接受

@Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

我们新建三个消息生产者

@Component
public class TopicSend {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, i am message all";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
    }

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}

send的key是topic.1  send1的key是topic.message,send2的key是topic.messages

所以理论上send会被两个队列消费,1.2都应该只有一个队列消费

我们再新建两个消费者

@Component
@RabbitListener(queues = "topic.A")
public class TopicReceiver {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1  : " + message);
    }

}

@Component
@RabbitListener(queues = "topic.B")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }
}

写三个测试用例

@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {
    @Autowired
    private TopicSend sender;

    @Test
    public void topic() throws Exception {
        sender.send();
    }

    @Test
    public void topic1() throws Exception {
        sender.send1();
    }

    @Test
    public void topic2() throws Exception {
        sender.send2();
    }

}

send的运行结果

send1的运行结果

send2的运行结果

结果符合预期。

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; text-indent: 18.0px; font: 10.5px "PingFang SC"; color: #000000 }
span.s1 { font: 10.5px Consolas }
span.s2 { }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; text-indent: 21.0px; font: 10.5px "PingFang SC"; color: #000000 }
span.s1 { }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; font: 12.0px ".PingFang SC"; color: #454545 }
span.s1 { font: 12.0px "Helvetica Neue" }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; font: 10.5px "PingFang SC"; color: #000000 }
span.s1 { }
span.s2 { font: 10.5px Consolas }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; font: 10.5px "PingFang SC"; color: #000000 }
span.s1 { }
span.s2 { font: 10.5px Consolas }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; font: 10.5px "PingFang SC"; color: #000000 }
span.s1 { }
span.s2 { font: 10.5px Consolas }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; text-align: justify; font: 12.0px ".PingFang SC"; color: #454545 }
span.s1 { font: 12.0px "Helvetica Neue" }

时间: 2024-07-31 08:35:16

springboot rabbitmq整合的相关文章

springboot+rabbitmq整合示例程

关于什么是rabbitmq,请看另一篇文: http://www.cnblogs.com/boshen-hzb/p/6840064.html 一.新建maven工程:springboot-rabbitmq 二.引入springboot和rabbitmq的依赖 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance&quo

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr

带着新人学springboot的应用06(springboot+RabbitMQ 中)

上一节说了这么多废话,看也看烦了,现在我们就来用鼠标点点点,来简单玩一下这个RabbitMQ. 注意:这一节还是不用敲什么代码,因为上一节我们设置了那个可视化工具,我们先用用可视化工具熟悉一下流程. 打开可视化页面,http://localhost:15672 顺便说一下RabbitMQ中的持持久化:这里持久化分为三种:消息持久化,交换器持久化,队列持久化... 举个例子,就简单说说交换器持久化,其实就是为了防止将消息发到交换器了,但是RabbitMQ服务器突然暴毙,没用了,那数据不就丧失了么?

带着新人学springboot的应用07(springboot+RabbitMQ 下)

说一两句废话,强烈推荐各位小伙伴空闲时候也可以写写自己的博客!不管水平高低,不管写的怎么样,不要觉得写不好或者水平不够就不写了(咳,我以前就是这样的想法...自我反省!). 但是开始写博客之后,你会发现很多你以为自己会的东西其实你并不会,然后你会经常在头脑中不断的搜索有关的片段,或者去别的大神博客里到处找有关的资料,最后领悟了属于自己的东西!然后再写出来和别人分享,别人也会给你点意见,你也会慢慢的改进.这不就是学习+复习+巩固+创新+分享+改进的这么的一个过程吗? 以前看过曹雪芹的红楼梦,让我印

RabbitMQ交换机、RabbitMQ整合springCloud

目标 1.交换机 2.RabbitMQ整合springCloud 交换机 蓝色区域===生产者 红色区域===Server:又称Broker,接受客户端的连接,实现AMQP实体服务 绿色区域===消费者 黄色区域===就是我们的交换机以及队列 由生产者投递信息到RabbitMQ Server里面某一个交换机对应的队列中,消费者则是从对应的队列中获取信息 交换机属性: Name:交换机名称 Type:交换机类型 direct.topic.fanout.headers Durability:是否需要

SpringBoot Kafka 整合实例教程

1.使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer. 工程POM文件代码如下: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

springBoot+SpringData 整合入门

SpringData概述 SpringData :Spring的一个子项目.用于简化数据库访问,支持NoSQL和关系数据存储.其主要目标是使用数据库的访问变得方便快捷. SpringData 项目所支持NoSQL存储: MongoDB(文档数据库) Neo4j(图形数据库) Redis(键/值存储) Hbase(列族数据库) SpringData 项目所支持的关系数据存储技术: JDBC JPA Spring Data : 致力于减少数据访问层 (DAO) 的开发量. 开发者唯一要做的,就只是声

springboot+security整合(1)

说明 springboot 版本 2.0.3源码地址:点击跳转 系列 springboot+security 整合(1) springboot+security 整合(2) springboot+security 整合(3) 一. 介绍 ??Spring Security 是一个能够为基于 Spring 的企业应用系统提供声明式的安全访问控制解决方案的安全框架.它提供了一组可以在 Spring 应用上下文中配置的 Bean,充分利用了 Spring IoC,DI(控制反转 Inversion o

SpringBoot RabbitMQ 延迟队列代码实现

场景 用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列 准备 利用rabbitmq_delayed_message_exchange插件: 首先下载该插件:https://www.rabbitmq.com/community-plugins.html 然后把该插件放到rabbitmq安装目录plugins下: 进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";