Spring Boot 入门之消息中间件篇(五)

原文地址:Spring Boot 入门之消息中间件篇(五)
博客地址:http://www.extlight.com

一、前言

在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。

我们常用的消息代理有 JMS 和 AMQP 规范。对应地,它们常见的实现分别是 ActiveMQ 和 RabbitMQ。

上篇文章《Spring Boot 入门之缓存和 NoSQL 篇(四)》

二、整合 ActiveMQ

2.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 如果需要配置连接池,添加如下依赖 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency> 

2.2 添加配置

# activemq 配置
spring.activemq.broker-url=tcp://192.168.2.12:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=false
spring.activemq.pool.max-connections=50
# 使用发布/订阅模式时,下边配置需要设置成 true
spring.jms.pub-sub-domain=false

此处 spring.activemq.pool.enabled=false,表示关闭连接池。

2.3 编码

配置类:

@Configuration
public class JmsConfirguration {

    public static final String QUEUE_NAME = "activemq_queue";

    public static final String TOPIC_NAME = "activemq_topic";

    @Bean
    public Queue queue() {
        return new ActiveMQQueue(QUEUE_NAME);
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(TOPIC_NAME);
    }
}

负责创建队列和主题。

消息生产者:

@Component
public class JmsSender {

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    public void sendByQueue(String message) {
        this.jmsTemplate.convertAndSend(queue, message);
    }

    public void sendByTopic(String message) {
        this.jmsTemplate.convertAndSend(topic, message);
    }
}

消息消费者:

@Component
public class JmsReceiver {

    @JmsListener(destination = JmsConfirguration.QUEUE_NAME)
    public void receiveByQueue(String message) {
        System.out.println("接收队列消息:" + message);
    }

    @JmsListener(destination = JmsConfirguration.TOPIC_NAME)
    public void receiveByTopic(String message) {
        System.out.println("接收主题消息:" + message);
    }
}

消息消费者使用 @JmsListener 注解监听消息。

2.4 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsTest {

    @Autowired
    private JmsSender sender;

    @Test
    public void testSendByQueue() {
        for (int i = 1; i < 6; i++) {
            this.sender.sendByQueue("hello activemq queue " + i);
        }
    }

    @Test
    public void testSendByTopic() {
        for (int i = 1; i < 6; i++) {
            this.sender.sendByTopic("hello activemq topic " + i);
        }
    }
}

打印结果:

接收队列消息:hello activemq queue 1
接收队列消息:hello activemq queue 2
接收队列消息:hello activemq queue 3
接收队列消息:hello activemq queue 4
接收队列消息:hello activemq queue 5

测试发布/订阅模式时,设置 spring.jms.pub-sub-domain=true

接收主题消息:hello activemq topic 1
接收主题消息:hello activemq topic 2
接收主题消息:hello activemq topic 3
接收主题消息:hello activemq topic 4
接收主题消息:hello activemq topic 5

三、整合 RabbitMQ

3.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 添加配置

spring.rabbitmq.host=192.168.2.30
spring.rabbitmq.port=5672
spring.rabbitmq.username=light
spring.rabbitmq.password=light
spring.rabbitmq.virtual-host=/test

3.3 编码

配置类:

@Configuration
public class AmqpConfirguration {

    //=============简单、工作队列模式===============

    public static final String SIMPLE_QUEUE = "simple_queue";

    @Bean
    public Queue queue() {
        return new Queue(SIMPLE_QUEUE, true);
    }

    //===============发布/订阅模式============

    public static final String PS_QUEUE_1 = "ps_queue_1";
    public static final String PS_QUEUE_2 = "ps_queue_2";
    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    @Bean
    public Queue psQueue1() {
        return new Queue(PS_QUEUE_1, true);
    }

    @Bean
    public Queue psQueue2() {
        return new Queue(PS_QUEUE_2, true);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(psQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(psQueue2()).to(fanoutExchange());
    }

    //===============路由模式============

    public static final String ROUTING_QUEUE_1 = "routing_queue_1";
    public static final String ROUTING_QUEUE_2 = "routing_queue_2";
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    @Bean
    public Queue routingQueue1() {
        return new Queue(ROUTING_QUEUE_1, true);
    }

    @Bean
    public Queue routingQueue2() {
        return new Queue(ROUTING_QUEUE_2, true);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");
    }

    @Bean
    public Binding directBinding2() {
        return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");
    }

    //===============主题模式============

    public static final String TOPIC_QUEUE_1 = "topic_queue_1";
    public static final String TOPIC_QUEUE_2 = "topic_queue_2";
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE_1, true);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE_2, true);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
    }

}

RabbitMQ 有多种工作模式,因此配置比较多。想了解相关内容的读者可以查看本站的《RabbitMQ 工作模式介绍》或者自行百度相关资料。

消息生产者:

@Component
public class AmqpSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 简单模式发送
     *
     * @param message
     */
    public void simpleSend(String message) {
        this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message);
    }

    /**
     * 发布/订阅模式发送
     *
     * @param message
     */
    public void psSend(String message) {
        this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message);
    }

    /**
     * 路由模式发送
     *
     * @param message
     */
    public void routingSend(String routingKey, String message) {
        this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message);
    }

    /**
     * 主题模式发送
     *
     * @param routingKey
     * @param message
     */
    public void topicSend(String routingKey, String message) {
        this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message);
    }
}

消息消费者:

@Component
public class AmqpReceiver {

    /**
     * 简单模式接收
     *
     * @param message
     */
    @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE)
    public void simpleReceive(String message) {
        System.out.println("接收消息:" + message);
    }

    /**
     * 发布/订阅模式接收
     *
     * @param message
     */
    @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1)
    public void psReceive1(String message) {
        System.out.println(AmqpConfirguration.PS_QUEUE_1 + "接收消息:" + message);
    }

    @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2)
    public void psReceive2(String message) {
        System.out.println(AmqpConfirguration.PS_QUEUE_2 + "接收消息:" + message);
    }

    /**
     * 路由模式接收
     *
     * @param message
     */
    @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1)
    public void routingReceive1(String message) {
        System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "接收消息:" + message);
    }

    @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2)
    public void routingReceive2(String message) {
        System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "接收消息:" + message);
    }

    /**
     * 主题模式接收
     *
     * @param message
     */
    @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1)
    public void topicReceive1(String message) {
        System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "接收消息:" + message);
    }

    @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2)
    public void topicReceive2(String message) {
        System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "接收消息:" + message);
    }
}

消息消费者使用 @RabbitListener 注解监听消息。

3.4 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpTest {

    @Autowired
    private AmqpSender sender;

    @Test
    public void testSimpleSend() {
        for (int i = 1; i < 6; i++) {
            this.sender.simpleSend("test simpleSend " + i);
        }
    }

    @Test
    public void testPsSend() {
        for (int i = 1; i < 6; i++) {
            this.sender.psSend("test psSend " + i);
        }
    }

    @Test
    public void testRoutingSend() {
        for (int i = 1; i < 6; i++) {
            this.sender.routingSend("order", "test routingSend " + i);
        }
    }

    @Test
    public void testTopicSend() {
        for (int i = 1; i < 6; i++) {
            this.sender.topicSend("user.add", "test topicSend " + i);
        }
    }
}

测试结果略过。。。

踩坑提醒1:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN

解决方案:

1) 请确保用户名和密码是否正确,需要注意的是用户名和密码的值是否包含空格或制表符(笔者测试时就是因为密码多了一个制表符导致认证失败)。

2) 如果测试账户使用的是 guest,需要修改 rabbitmq.conf 文件。在该文件中添加 “loopback_users = none” 配置。

踩坑提醒2:Cannot prepare queue for listener. Either the queue doesn‘t exist or the broker will not allow us to use it

解决方案:

我们可以登陆 RabbitMQ 的管理界面,在 Queue 选项中手动添加对应的队列。

四、参考资料

原文地址:https://www.cnblogs.com/moonlightL/p/8367543.html

时间: 2024-11-10 00:55:08

Spring Boot 入门之消息中间件篇(五)的相关文章

Spring Boot 入门之持久层篇(三)

原文地址:Spring Boot 入门之持久层篇(三) 博客地址:http://www.extlight.com 一.前言 上一篇<Spring Boot 入门之 Web 篇(二)>介绍了 Spring Boot 的 Web 开发相关的内容,项目的开发离不开数据,因此本篇开始介绍持久层相关的知识. 二.整合 JdbcTemplate 2.1 添加依赖 <dependency> <groupId>org.springframework.boot</groupId&g

Spring Boot 入门之缓存和 NoSQL 篇(四)

原文地址:Spring Boot 入门之缓存和 NoSQL 篇(四) 博客地址:http://www.extlight.com 一.前言 当系统的访问量增大时,相应的数据库的性能就逐渐下降.但是,大多数请求都是在重复的获取相同的数据,如果使用缓存,将结果数据放入其中可以很大程度上减轻数据库的负担,提升系统的响应速度. 本篇将介绍 Spring Boot 中缓存和 NoSQL 的使用.上篇文章<Spring Boot 入门之持久层篇(三)>. 二.整合缓存 Spring Boot 针对不同的缓存

Spring Boot干货系列:(五)开发Web应用JSP篇

Spring Boot干货系列:(五)开发Web应用JSP篇 原创 2017-04-05 嘟嘟MD 嘟爷java超神学堂 前言 上一篇介绍了Spring Boot中使用Thymeleaf模板引擎,今天来介绍一下如何使用SpringBoot官方不推荐的jsp,虽然难度有点大,但是玩起来还是蛮有意思的. 正文 先来看看整体的框架结构,跟前面介绍Thymeleaf的时候差不多,只是多了webapp这个用来存放jsp的目录,静态资源还是放在resources的static下面. 引入依赖 使用内嵌的to

构建微服务:Spring boot 入门篇

构建微服务:Spring boot 入门篇 什么是spring boot Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.用我的话来理解,就是spring boot其实不是什么新的框架,它默认配置了很多框架的使用方式,就像maven整合了所有的jar包,spring boot整合了所有的框架(不知道这样比喻是否合适). 使用spring boot有什

Spring Boot 揭秘与实战(五) 服务器篇 - Tomcat 代码配置

Spring Boot 内嵌的 Tomcat 服务器默认运行在 8080 端口.如果,我们需要修改Tomcat的端口,我们可以在 src/main/resources/application.properties 中配置Tomcat信息. server.port=8089 现在,你可以重新运行上面的例子,看下是不是 Tomcat 的端口变成 8089 了. 如果想直接通过代码配置 Tomcat, 可以直接定义 TomcatEmbeddedServletContainerFactory. 现在,我

Spring Boot 揭秘与实战(五) 服务器篇 - 其他内嵌服务器 发表于 2017-01-03 | Spring框架 | Spri

文章目录 1. Jetty 的切换 2. Undertow的使用 Spring Boot 可选择内嵌 Tomcat.Jetty 和 Undertow,因此我们不需要以 war 包形式部署项目.<Spring Boot 揭秘与实战(五) 服务器篇 - 内嵌的服务器 Tomcat剖析>一文,已经讲解了内嵌的服务器 Tomcat,那么,这篇文章大概讲解下另外两个内嵌的服务器 Jetty 和 Undertow. Jetty 的切换 Spring Boot 默认使用的是 Tomcat 作为内嵌的服务器,

Spring Boot 入门(五):集成 AOP 进行日志管理

本篇文章是接着 Spring boot 入门(四):集成 Shiro 实现登陆认证和权限管理写的,按照前面几篇博客的教程,可以搭建一个简单的项目,主要包含了 Pagehelper+MyBatis 分页查询,Generator 代码自动生成器,Shiro登录及权限管理.本篇博客主要是集成 AOP 进行日志管理 1.导入 jar 包 1 <!-- aop --> 2 <dependency> 3 <groupId>org.springframework.boot</g

Spring Cloud 入门 之 Config 篇(六)

原文地址:Spring Cloud 入门 之 Config 篇(六) 博客地址:http://www.extlight.com 一.前言 随着业务的扩展,为了方便开发和维护项目,我们通常会将大项目拆分成多个小项目做成微服务,每个微服务都会有各自配置文件,管理和修改文件起来也会变得繁琐.而且,当我们需要修改正在运行的项目的配置时,通常需要重启项目后配置才能生效. 上述的问题将是本篇需要解决的问题. 二.介绍 2.1 简单介绍 Spring Cloud Config 用于为分布式系统中的基础设施和微

Spring boot入门到精通视频教程

14套java精品高级架构课,缓存架构,深入Jvm虚拟机,全文检索Elasticsearch,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件,Mysql分布式集群,服务架构,运 维架构视频教程 14套精品课程介绍: 1.14套精 品是最新整理的课程,都是当下最火的技术,最火的课程,也是全网课程的精品: 2.14套资 源包含:全套完整高清视频.完整源码.配套文档: 3.知识也 是需要投资的,有投入才会有产出(保证投入产出比是