解决Springboot整合ActiveMQ发送和接收topic消息的问题

环境搭建

1.创建maven项目(jar)

2.pom.xml添加依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.0.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>

3.编写引导类

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }
}

4.在resources下的application.properties配置文件中添加对应的配置

spring.activemq.broker-url=tcp://192.168.25.131:61616
#此url为activeMQ所在服务器的链接

5.在类中注入JmsMessageTemplate

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

6.调用JmsMessageTemplate的方法发送消息

jmsMessagingTemplate.convertAndSend(“queue消息名称”,"消息内容");

设置消息发送类型

在引导类中配置消息类型

Queue

@Bean(name="queue")
public Destination getQueue(){
    return new ActiveMQQueue("queue_test");
}

Topic

@Bean(name="topic")
public Destination getTopic(){
    return new ActiveMQTopic("topic_test");
}

类中注入发送消息类型

在发送消息的类中注入发送消息类型对象Destination

Queue

@Resource(name="queue")
private Destination queue;

Topic

@Autowired
@Qualifier(value="topic")
private Destination topic;

消息发送

jmsMessageTemplate.convertAndSend(queue,"消息内容");
jmsMessageTemplate.convertAndSend(topic,"消息内容");

编写消费者

@Component
public class ActiveMQConsumer {
    //接收queue消息
    @JmsListener(destination = "queue_test")
    public void handler(String message){
        System.out.println(message);
    }
    //接收topic消息
    @JmsListener(destination = "topic_test")
    public void handlerTopic(String msessage){
        System.out.println(msessage);
    }
}

启动测试

controller类的方法中添加
@RequestMapping("/send")
public void sendQueue(){
    jmsMessagingTemplate.convertAndSend(queue,"这是Queue的消息");
    jmsMessagingTemplate.convertAndSend(topic,"这是Topic的消息");
}

运行后,在控制台只输出了Queue的消息

问题

Springboot整合ActiveMQ模式只能监听Queue队列的消息进行处理,所以如何处理topic消息?

解决:

在Springboot的application.properties文件中添加如下内容

spring.jms.pub-sub-domain=true   //默认是false,开启发布订阅模式

启动测试

经过上述修改,又只能监听topic的消息,queue的消息又无法获取。

解决:

只有通过自定义监听器类来处理

在监听器类的@JmsListener添加connectionFactory属性

@Component
public class ActiveMQConsumer {
    //接收queue消息
   @JmsListener(destination = "queue_test",containerFactory =
                   "queueListenerContainerFactory")
    public void handler(String message){
        System.out.println(message);
    }
    //接收topic消息
    @JmsListener(destination = "topic_test",containerFactory =
                "topicListenerContainerFactory")
    public void handlerTopic(String msessage){
        System.out.println(msessage);
    }
}

创建一个配置类,在配置类中提供两个监听工厂配置

@Configuration
public class ConsumerConfiguration {

    @Value("${spring.activemq.broker-url}")
    private String host;

    @Bean
    public ConnectionFactory getActiveMqConnection(){
        return new ActiveMQConnectionFactory(host);
    }

    @Bean(name="queueListenerContainerFactory")
    public JmsListenerContainerFactory queueListenerContailerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }
    @Bean(name="topicListenerContainerFactory")
    public JmsListenerContainerFactory topicListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

运行测试

原文地址:https://www.cnblogs.com/sjq0928/p/11371620.html

时间: 2024-08-26 23:30:52

解决Springboot整合ActiveMQ发送和接收topic消息的问题的相关文章

Web项目容器集成ActiveMQ &amp; SpringBoot整合ActiveMQ

集成tomcat就是随项目启动而启动tomcat,最简单的方法就是监听器监听容器创建之后以Broker的方式启动ActiveMQ. 1.web项目中Broker启动的方式进行集成 在这里采用Listener监听ServletContext创建和销毁进行Broker的启动和销毁. 0.需要的jar包: 1.listener实现ServletContextListener接口 package cn.qlq.listener; import javax.servlet.ServletContextEv

SpringBoot整合ActiveMQ实现持久化

点对点(P2P) 结构 创建生产者和消费者两个springboot工程 导入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 生产者 步骤一:application.properties文件 spring.activemq.brok

SpringBoot整合ActiveMQ发送邮件

虽然ActiveMQ以被其他MQ所替代,但仍有学习的意义,本文采用邮件发送的例子展示ActiveMQ 1. 生产者1.1 引入maven依赖1.2 application.yml配置1.3 创建配置类ConfigQueue1.4 创建生产者类Producer1.5 启动类AppProducer2. 消费者2.1 引入maven依赖2.2 application.yml配置2.3 创建消费者类Consumer2.4 启动类AppConsumer3. 启动截图3.1 生产者截图3.2 消费者截图3.

SpringBoot 整合 ActiveMq

消息队列,用来处理开发中的高并发问题,通过线程池.多线程高效的处理并发任务. 首先,需要下载一个ActiveMQ的管理端:我本地的版本是 activemq5.15.8,打开activemq5.15.8\bin\win64\wrapper.exe客户端,可以根据localhost:端口号,访问ActiveMQ的管理界面.默认的用户名.密码都是admin. (一)pom 文件中添加 ActiveMq 依赖 <dependency> <groupId>org.apache.activem

springboot整合邮件发送

在做项目的过程中,难免会遇到要发送邮件的情况.这里,将springboot与邮件发送整合一下: 一:添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId></dependency> 二:要在配置文件中添加一下配置: #发送邮件的配置#使用qq邮箱发送 spring.ma

ActiveMQ 发送和就收消息

一.添加 jar 包 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version> </dependency> 二.消息传递的两种形式 1.点对点:发送的消息只能被一个消费者接收,第一个消费者接收后,消息没了 2.发布/订阅:消息可以被多个消费

Spring整合ActiveMQ及多个Queue消息监听的配置

消息队列(MQ)越来越火,在java开发的项目也属于比较常见的技术,MQ的相关使用也成java开发人员必备的技能.笔者公司采用的MQ是ActiveMQ,且消息都是用的点对点的模式.本文记录了实现Spring整合ActivateMQ的全过程及如何使用MQ,便于后续查阅. 一.项目的搭建 采用maven构建项目,免去了copy jar包的麻烦.因此,我们创建了一个java类型的Maven Project (1)项目结构图 先把项目结构图看一下,便于对项目的理解. (2)pom.xml 我们需要加入以

XMPP系列(四)---发送和接收文字消息,获取历史消息功能

今天开始做到最主要的功能发送和接收消息.获取本地历史数据. 先上到目前为止的效果图:              首先是要在XMPPFramework.h中引入数据存储模块: //聊天记录模块的导入 #import "XMPPMessageArchiving.h" #import "XMPPMessageArchivingCoreDataStorage.h" #import "XMPPMessageArchiving_Contact_CoreDataObje

Java Socket发送与接收HTTP消息简单实现

在上次Java Socket现实简单的HTTP服务我 们实现了简单的HTTP服务,它可以用来模拟HTTP服务,用它可以截获HTTP请求的原始码流,让我们很清楚的了解到我们向服务发的HTTP消息的结 构,对HTTP请求消息有个清晰的认识.这一节我想写了一个客户的程序,就是用来模拟浏览器,用来向服务器发送HTTP请求,最得要的是可以用它来显示服 务器发回来的HTTP响应消息的一般结构. [java] view plaincopy import java.io.IOException; import