学习ActiveMQ(四):spring与ActiveMQ整合

  在上一篇中已经怎么使用activemq的api来实现消息的发送接收了,但是在实际的开发过程中,我们很少使用activemq直接上去使用,因为我们每次都要创建连接工厂,创建连接,创建session。。。有些繁琐,那么利用spring的话简单多了,强大的spring

提供了对了jms的支持,我们可以使用JmsTemplate来实现,JmsTemplate隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑,接下来就一起来看看具体怎么做吧。

首先我们打开idea新建一个maven的webapp项目,对项目项目结构进行一下调整,如下图:

接下来对pom.xml进行修改,需要spring的jar包和jms的jar包,我贴出我的完整代码:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.easylab.jms</groupId>
    <artifactId>jms-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spring.version>4.2.5.RELEASE</spring.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.4</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
</project>

在resource文件夹下面建立一个common.xml文件,并进行相关配置,配置作用看代码注释。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--支持注解-->
    <context:component-scan base-package="com.easylab.jms"/>

    <!--使用PooledConnectionFactory对session和消息producer的缓存机制带来的性能提升-->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <!--连接mq的连接工厂-->
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://127.0.0.1:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用缓存可以提升效率-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <!--配置JmsTemplate,用于发送消息-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--给一个默认的是destination-->
        <property name="defaultDestination" ref="queueDestination"/>
    </bean>

    <!--队列模式的destination-->
    <bean id="queueDestination"  class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue"/>
    </bean>

    <!--主题模式的destination配置-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>

    <!--消费者所需-->
    <!--配置 消息监听容器 使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量-->
    <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>
    <!--配置 消息监听容器-->
    <bean id="jmsContainerTopic" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="consumerMessageListenerTopic"/>
    </bean>

    <!--自定义消息监听器-->
    <bean id="consumerMessageListener" class="com.easylab.jms.consumer.ConsumerMessageListener"/>
    <bean id="consumerMessageListenerTopic" class="com.easylab.jms.consumer.ConsumerMessageListenerTopic"/>
</beans>

接下来在com.easylab.jms.producer创建一个IProducerService 发送者接口

package com.easylab.jms.producer;

/******************************
 * @author : liuyuan
 * <p>ProjectName:jms-spring  </p>
 * @ClassName :  IProducerService
 * @date : 2018/6/24 0024
 * @time : 8:41
 * @createTime 2018-06-24 8:41
 * @version : 2.0
 * @description :消息发送
 *******************************/
public interface IProducerService {

    public void sendMessage(String message);
}

对这个接口进行实现,在com.easylab.jms.producer创建一个ProducerServiceImpl实现类(使用队列模式发送)

package com.easylab.jms.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

/******************************
 * @author : liuyuan
 * <p>ProjectName:jms-spring  </p>
 * @ClassName :  ProducerServiceImpl
 * @date : 2018/6/24 0024
 * @time : 8:42
 * @createTime 2018-06-24 8:42
 * @version : 2.0
 * @description :
 *******************************/
@Service
public class ProducerServiceImpl implements IProducerService {

    @Autowired
    JmsTemplate jmsTemplate;

  //这里不定义的话,就使用在xml文件里配置的默认的destination
/*    @Resource(name = "queueDestination")
    Destination destination;*/

    public void sendMessage(final String message) {

        // 使用JmsTemplate发送消息
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                // 创建一个消息
                TextMessage textMessage = session.createTextMessage(message);

                return textMessage;
            }
        });
        System.out.println("发送消息" + message);
    }
}

再对这个接口写一个实现,在com.easylab.jms.producer创建一个ProducerServiceImplTopic实现类(使用主题模式发送)

package com.easylab.jms.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

/******************************
 * @author : liuyuan
 * <p>ProjectName:jms-spring  </p>
 * @ClassName :  ProducerServiceImpl
 * @date : 2018/6/24 0024
 * @time : 8:42
 * @createTime 2018-06-24 8:42
 * @version : 2.0
 * @description :
 *******************************/
@Service
public class ProducerServiceImplTopic implements IProducerService {

    @Autowired
    JmsTemplate jmsTemplate;

    //指定主题模式
    @Resource(name = "topicDestination")
    Destination destination;

    public void sendMessage( final String message) {
        // 使用JmsTemplate发送消息
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                // 创建一个消息
                TextMessage textMessage = session.createTextMessage(message);

                return textMessage;
            }
        });
        System.out.println("发送消息" + message);
    }
}

ok,接下在com.easylab.jms.produce写一个AppProducer运行类就行了

package com.easylab.jms.producer;

import javafx.application.Application;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;

import javax.jms.*;

/******************************
 * @author : liuyuan
 * <p>ProjectName:jms-test  </p>
 * @ClassName :  AppProducer
 * @date : 2018/6/20 0020
 * @time : 20:46
 * @createTime 2018-06-20 20:46
 * @version : 2.0
 * @description :
 *******************************/

public class AppProducer {

    public static void main(String[] args) {

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("common.xml");

        IProducerService service = (IProducerService)context.getBean(ProducerServiceImpl.class);

        for (int i = 0; i < 2; i++) {
            service.sendMessage("test" + i);
        }

        //切换到主题模式下再发送几条
        service = (IProducerService)context.getBean(ProducerServiceImplTopic.class);
        for (int i = 0; i < 10; i++) {
            service.sendMessage("test" + i);
        }
        context.close();

    }

}

可以运行一下看看,可以看到发送信息,但是没有接受消息,因为我们还没有写消费者,我们可以通过监听器来监听队列,如果队列中有消息,就会直接进入到监听器的onMessage方法中进行我们的业务处理,所以我们可以不用写消费者代码了。用监听器即可。

在配置文件中,我们写了两个监听容器和监听器,用他们分别监听队列和主题,查看效果。接下来创建两个监听器.

在com.easylab.jms.produce写一个ConsumerMessageListener监听器:

package com.easylab.jms.consumer;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/******************************
 * @author : liuyuan
 * <p>ProjectName:jms-spring  </p>
 * @ClassName :  ConsumerMessageListener
 * @date : 2018/6/24 0024
 * @time : 9:16
 * @createTime 2018-06-24 9:16
 * @version : 2.0
 * @description :
 *
 *
 *
 *******************************/

public class ConsumerMessageListener implements MessageListener {

    public void onMessage(Message message) {

        TextMessage textMessage = (TextMessage) message;

        try {
            System.out.println("队列监听器接收消息" + textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

在com.easylab.jms.produce写一个ConsumerMessageListener监听器:

package com.easylab.jms.consumer;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/******************************
 * @author : liuyuan
 * <p>ProjectName:jms-spring  </p>
 * @ClassName :  ConsumerMessageListener
 * @date : 2018/6/24 0024
 * @time : 9:16
 * @createTime 2018-06-24 9:16
 * @version : 2.0
 * @description :
 *******************************/

public class ConsumerMessageListenerTopic implements MessageListener {

    public void onMessage(Message message) {

        TextMessage textMessage = (TextMessage) message;

        try {
            System.out.println("我是topic监听器,接收到消息:" + textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

到这里代码就全部写完了,运行刚刚创建启动类

可以清楚的看到消息队列接收到了两条,主题队列接收到了十条,测试成功。

ActiveMQ结合Spring开发最佳实践和建议:

1:Camel框架支持大量的企业集成模式,可以大大简化集成组件间的大量服务和复杂的消息流。而Spring框架更注重简单性,仅仅支持基本的最佳实践。
2:Spring消息发送的核心架构是JmsTemplate,隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。但是
JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升。
3:新的Spring里面,可以设置org.springframework.jms.connection.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory
4:不建议使用JmsTemplate的receive()调用,因为在JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回,这对性能影响很大
5:请使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量

原文地址:https://www.cnblogs.com/liuyuan1227/p/10744278.html

时间: 2024-11-15 06:15:55

学习ActiveMQ(四):spring与ActiveMQ整合的相关文章

mybatis学习笔记(14)-spring和mybatis整合

mybatis学习笔记(14)-spring和mybatis整合 mybatis学习笔记14-spring和mybatis整合 整合思路 整合环境 sqlSessionFactory 原始dao开发和spring整合后 mapper代理开发 遇到的问题 本文主要将如何将spring和mybatis整合,只是作简单的示例,没有使用Maven构建.并展示mybatis与spring整合后如何进行原始dao开发和mapper代理开发. 整合思路 需要spring通过单例方式管理SqlSessionFa

深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例

基于spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务. 环境准备 工具 JDK1.6或1.7 Spring4.1.0 ActiveMQ5.11.1 Tomcat7.x 目录结构 所需jar包 项目的配置 配置ConnectionFactory connectionFactory是Spring用于创建到JMS服务器链接

Spring整合ActiveMQ:spring+JMS+ActiveMQ+Tomcat

一.目录结构 相关jar包 二.关键配置activmq.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi=&quo

Spring JMS ActiveMQ整合(转)

转载自:http://my.oschina.net/xiaoxishan/blog/381209#comment-list ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中 记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试

SpringMVC学习(四)——Spring、MyBatis和SpringMVC的整合

之前我整合了Spring和MyBatis这两个框架,不会的可以看我的文章MyBatis框架的学习(六)——MyBatis整合Spring.本文我再来讲SpringMVC和MyBatis整合开发的方法,这样的话,Spring.MyBatis和SpringMVC三大框架的整合开发我们就学会了.这里我使用的Spring是Spring4.1.3这个版本(SpringMVC自然也是这个版本),MyBatis是MyBatis3.2.7这个版本. 为了更好的学习SpringMVC和MyBatis整合开发的方法

Spring和ActiveMQ整合的完整实例

Spring和ActiveMQ整合的完整实例 前言 这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务. 环境准备 工具 JDK1.6或1.7 Spring4.1.0 ActiveMQ5.11.1 Tomcat7.x 目录结构 所需jar包 项目的配置 配置ConnectionFactory conn

深入浅出JMS之Spring和ActiveMQ整合的完整实例

第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点. 第二篇博文深入浅出JMS(二)–ActiveMQ简单介绍以及安装,我们介绍了消息中间件ActiveMQ,安装,启动,以及优缺点. 第三篇博文深入浅出JMS(三)–ActiveMQ简单的HelloWorld实例,我们实现了一种点对点的同步消息模型,并没有给大家呈现发布订阅模型. 前言 这篇博文,我们基于spring

学习activemq,在spring中activemq的配置信息

提供者: <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/sch

Mybatis框架学习(四)—查询缓存与spring的整合开发

1       项目整体目录 2       查询缓存 2.1     缓存的意义 将用户经常查询的数据放在缓存(内存)中,用户去查询数据就不用从磁盘上(关系型数据库数据文件)查询,从缓存中查询,从而提高查询效率,解决了高并发系统的性能问题. 2.1    mybatis持久层缓存 mybatis提供一级缓存和二级缓存 mybatis一级缓存是一个SqlSession级别,sqlsession只能访问自己的一级缓存的数据,二级缓存是跨sqlSession,是mapper级别的缓存,对于mappe