Spring 实现远程访问详解——jms和activemq

前几章我们分别利用spring rmi、httpinvoker、httpclient、webservice技术实现不同服务器间的远程访问。本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。

一.  简介

1.      什么是Apache ActiveMq

Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/

2.      什么是JMS

JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。

JMS 支持两种消息传递模型:

点对点(point-to-point,简称 PTP)

发布/订阅(publish/subscribe,简称 pub/sub)。

这两种消息传递模型非常相似,但有以下区别:

PTP 消息传递模型规定了一条消息只能传递给一个接收方。 采用javax.jms.Queue表示。Spring配置类型destination-type="queue"。

Pub/sub 消息传递模型允许一条消息传递给多个接收方。采用javax.jms.Topic表示。Spring配置类型destination-type="topic"。

二.  单服务器异步访问

3.      Spring 整合JMS和ActiveMq流程

1)      下载和部署ActiveMq服务器

2)      Spring jms和activemq相关依赖引入

3)      Spring整合activemq配置

4)      定义消息发布者(生产者)

5)      定义消息订阅者(消费者)

6)      Spring mvc配置

7)      实例测试

4.      Spring整合JMS和ActiveMq具体实现

1)      下载和部署ActiveMq服务器

下载地址:

http://activemq.apache.org/2016/03/07/apache-activemq-5132-released.html

解压下载文件,假如我保存在D盘根目录,找到目录apache-activemq-5.13.2\bin\win64下的activemq.bat,启动activemq服务。

2)      Spring jms和activemq相关依赖

<!-- xbean如<amq:connectionFactory /> -->
       <dependency>
           <groupId>org.apache.xbean</groupId>
           <artifactId>xbean-spring</artifactId>
           <version>3.16</version>
       </dependency>
       <!-- Active MQ -->
       <dependency>
           <groupId>org.apache.activemq</groupId>
           <artifactId>activemq-all</artifactId>
           <version> 5.13.1</version>
     </dependency>
<dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-jms</artifactId>
           <version> 4.2.0.RELEASE</version>
     </dependency>

3)      Application-context-jms中配置jms和activemq

注意头部信息需要引入jms和activemq

具体配置如下:

<?xml version="1.0"encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.2.xsd
       http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd
       http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">

    <description>application-context-activemqconfig</description>
    <!-- activemq -->
    <amq:connectionFactory id="amqConnectionFactory"
       brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>
    <bean id="connectionFactory"
       class="org.springframework.jms.connection.CachingConnectionFactory">
       <constructor-arg ref="amqConnectionFactory"/>
       <property name="sessionCacheSize"value="100" />
    </bean>
    <!-- ====Producer side start==== -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
       <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
       <property name="pubSubDomain"value="false" />
    </bean>
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="connectionFactory"/>
       <!-- pub/sub模型(发布/订阅) -->
       <property name="pubSubDomain"value="true" />
    </bean>
    <!-- ====Producer side end==== -->
    <!-- ====Consumer side start==== -->
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue"
       container-type="default" connection-factory="connectionFactory"
       acknowledge="auto">
       <jms:listener destination="test.queue"ref="queueReceiver" />
       <jms:listener destination="test.queue"ref="queueReceiver2" />
    </jms:listener-container>
    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic"
       container-type="default" connection-factory="connectionFactory"
       acknowledge="auto">
       <jms:listener destination="test.topic"ref="topicReceiver" />
       <jms:listener destination="test.topic"ref="topicReceiver2" />
    </jms:listener-container>
    <!-- ====Consumer side end==== -->
</beans>

4)      定义消息发布者(生产者)

a.      Queue队列消息发布者

package com.lm.core.service.impl.sender;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.Session;

importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.jms.core.JmsTemplate;
importorg.springframework.jms.core.MessageCreator;
importorg.springframework.stereotype.Component;

@Component
publicclass QueueSender {
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;// 通过@Qualifier修饰符来注入对应的bean

    /**
     * 发送一条消息到指定的队列(目标)
     *
     *@param queueName
     *           队列名称
     *@param message
     *           消息内容
     */
    public void send(String queueName, finalString message) {
             jmsTemplate.send(queueName, newMessageCreator() {
                       @Override
                       public MessagecreateMessage(Session session) throws JMSException {
                                returnsession.createTextMessage(message);
                       }
             });
    }
}

b.      Topic队列信息发布者

packagecom.lm.core.service.impl.sender;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.Session;

importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.jms.core.JmsTemplate;
importorg.springframework.jms.core.MessageCreator;
importorg.springframework.stereotype.Component;

@Component
publicclass TopicSender {
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 发送一条消息到指定的队列(目标)
     *
     *@param queueName
     *           队列名称
     *@param message
     *           消息内容
     */
    public void send(String topicName, finalString message) {
             jmsTemplate.send(topicName, newMessageCreator() {
                       @Override
                       public MessagecreateMessage(Session session) throws JMSException {
                                returnsession.createTextMessage(message);
                       }
             });
    }
}

5)      定义消息订阅者(消费者)

a)        Queue队列消息接受者1

packagecom.lm.core.service.impl.receiver;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

importorg.springframework.stereotype.Component;

@Component
publicclass QueueReceiver implements MessageListener {
         @Override
         public void onMessage(Message message){
                   try {
                            System.out.println("QueueReceiver1接收到消息:"
                                               +((TextMessage) message).getText());
                   } catch (JMSException e) {
                            e.printStackTrace();
                   }
         }
}

b)        Queue队列消息接受者2

packagecom.lm.core.service.impl.receiver;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

importorg.springframework.stereotype.Component;

@Component
publicclass QueueReceiver2 implements MessageListener {
         @Override
         public void onMessage(Message message){
                   try {
                            System.out.println("QueueReceiver2接收到消息:"
                                               +((TextMessage) message).getText());
                   } catch (JMSException e) {
                            e.printStackTrace();
                   }
         }
}
c)        Topic队列消息接受者1
packagecom.lm.core.service.impl.receiver;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

importorg.springframework.stereotype.Component;

@Component
public classTopicReceiver implements MessageListener {
         @Override
         public void onMessage(Message message){
                   try {
                            System.out.println("TopicReceiver1接收到消息:"
                                               +((TextMessage) message).getText());
                   } catch (JMSException e) {
                            e.printStackTrace();
                   }
         }
}

d)        Topic队列消息接受者2

packagecom.lm.core.service.impl.receiver;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

importorg.springframework.stereotype.Component;

@Component
publicclass TopicReceiver2 implements MessageListener {
         @Override
         public void onMessage(Message message){
                   try {
                            System.out.println("TopicReceiver2接收到消息:"
                                               +((TextMessage) message).getText());
                   } catch (JMSException e) {
                            e.printStackTrace();
                   }
         }
}

6)      Spring mvc配置

注意:需要通过mvc:resources配置静态资源,否则找不到相关依赖的资源

<?xml version="1.0"encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                       http://www.springframework.org/schema/context
                       http://www.springframework.org/schema/context/spring-context-3.1.xsd
                        http://www.springframework.org/schema/mvc
                       http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
    <description>spring-mvc config</description>

    <!-- 启动mvc注解 -->
    <mvc:annotation-driven />

    <!-- 控制器注解@Controller包自动扫描注入 -->
    <context:component-scan base-package="com.lm.web" />

    <!-- jackson json配置 -->
    <bean id="mappingJacksonHttpMessageConverter"
       class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter">
       <property name="supportedMediaTypes">
           <list>
              <value>text/html;charset=UTF-8</value>
           </list>
       </property>
    </bean>
    <!-- 启动SpringMVC的注解功能,完成请求和注解POJO的映射 -->
    <bean
       class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
       <property name="messageConverters">
           <list>
              <ref bean="mappingJacksonHttpMessageConverter"/> <!-- JSON转换器 -->
           </list>
       </property>
    </bean>
    <!-- 静态资源加载 -->
    <mvc:resources location="/resources/" mapping="/resources/**" />
    <!-- 定义跳转的文件的前后缀,视图模式配置 -->
    <bean
       class="org.springframework.web.servlet.view.InternalResourceViewResolver">
       <!-- 这里的配置我的理解是自动给后面action的方法return的字符串加上前缀和后缀,变成一个可用的url地址 -->
       <property name="prefix"value="/WEB-INF/view/" />
       <property name="suffix"value=".jsp" />
    </bean>

    <!-- 配置文件上传,如果没有使用文件上传可以不用配置,当然如果不配,那么配置文件中也不必引入上传组件包 -->
    <bean id="multipartResolver"
       class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
       <!-- 默认编码 -->
       <property name="defaultEncoding"value="utf-8" />
       <!-- 文件大小最大值 -->
       <property name="maxUploadSize"value="10485760000" />
       <!-- 内存中的最大值 -->
       <property name="maxInMemorySize"value="40960" />
    </bean>
</beans>

7)      实例测试

a)      Queue响应信息

从日志可以看出,Queue模式是单点响应消息

b)      Topic响应信息

从日志可以看出,topic模式时,不管你有多少订阅者,同时响应消息

三.  多服务器异步访问(远程访问)

上一节我们实现了单web服务器响应异步通信。本节将实现多web服务器实现异步通信。

1.      实现流程

1)      引入jms和activemq相关的依赖

2)      Spring 配置jms和activemq

3)      定义消息订阅者(消费者)

4)      实例测试

2.      具体实现

1)      引入jms和activemq相关的依赖

类似上一节依赖

2)      Spring 配置jms和activemq

类似上一节配置,只配置订阅者,如下:

<?xml version="1.0"encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.2.xsd
       http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd
       http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">

    <description>application-context-activemqconfig</description>

    <!--activemq -->
    <amq:connectionFactory id="amqConnectionFactory"
       brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>

    <bean id="connectionFactory"
       class="org.springframework.jms.connection.CachingConnectionFactory">
       <constructor-arg ref="amqConnectionFactory"/>
       <property name="sessionCacheSize"value="100" />
    </bean>
    <!-- ====Producer side start==== -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <!-- <beanid="jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="connectionFactory" />
       非pub/sub模型(发布/订阅),即队列模式
       <property name="pubSubDomain"value="false" />
    </bean>
    定义JmsTemplate的Topic类型
    <bean id="jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="connectionFactory" />
       pub/sub模型(发布/订阅)
       <property name="pubSubDomain"value="true" />
    </bean> -->
    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic"
       container-type="default" connection-factory="connectionFactory"
       acknowledge="auto">
       <jms:listener destination="test.topic"ref="topicReceiver" />
       <jms:listener destination="test.topic"ref="topicReceiver2" />
    </jms:listener-container>
</beans>

3)      定义消息订阅者(消费者)

a.      Topic模式订阅者1

package com.lm.core.service.receiver;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

importorg.springframework.stereotype.Component;

@Component
public classTopicReceiver implements MessageListener {
       @Override
       public void onMessage(Message message) {
                 try {
                          System.out.println("TopicReceiver1client接收到消息:"
                                             +((TextMessage) message).getText());
                 } catch (JMSException e) {
                          e.printStackTrace();
                 }
       }
}

b.      Topic模式订阅者2

packagecom.lm.core.service.receiver;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

importorg.springframework.stereotype.Component;

@Component
publicclass TopicReceiver2 implements MessageListener {
         @Override
         public void onMessage(Message message){
                  try {
                           System.out.println("TopicReceiver2client:接收到消息:"
                                              +((TextMessage) message).getText());
                  } catch (JMSException e) {
                           e.printStackTrace();
                  }
         }
}

4)      实例测试

启动activemq服务,tomcat启动server和client

从日志可以看出发送消息时,server和client响应到消息。

参考文献:

Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型

代码地址:http://download.csdn.net/detail/a123demi/9498084

时间: 2024-07-29 23:52:26

Spring 实现远程访问详解——jms和activemq的相关文章

Spring——jar包详解(转)

Spring——jar包详解 org.springframework.aop ——Spring的面向切面编程,提供AOP(面向切面编程)的实现 org.springframework.asm——spring 2.5.6的时候需要asm jar包,spring3.0开始提供它自己独立的asm jar包 org.springframework.aspects——Spring提供的对AspectJ框架的整合 org.springframework.beans——所有应用都用到,包含访问配置文件,创建和

一份spring配置文件及其详解

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/axu20/archive/2009/10/14/4668188.aspx 1.基本配置:<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/

Spring的lazy-init详解

Spring中lazy-init详解ApplicationContext实现的默认行为就是在启动服务器时将所有singleton bean提前进行实例化(也就是依赖注入).提前实例化意味着作为初始化过程的一部分,applicationContext实例会创建并配置所有的singleton bean.通常情况下这是一件好事,因为这样在配置中的任何错误就会被立刻实现(否则的话可能要话几个小时甚至几天). <bean id="testBean" class="cn.itcas

Spring的AOP详解

Spring的AOP详解 一.AOP基础 1.1AOP是什么 考虑这样一个问题:需要对系统中的某些业务做日志记录,比如支付系统中的支付业务需要记录支付相关日志,对于支付系统可能相当复杂,比如可能有自己的支付系统,也可能引入第三方支付平台,面对这样的支付系统该如何解决呢? 传统解决方案 1.日志部分定义公共类LogUtils,定义logPayBegin方法用于记录支付开始日志, logPayEnd用于记录支付结果 logPayBegin(long userId,long money) logPay

Spring Cache抽象详解

缓存简介 缓存,我的理解是:让数据更接近于使用者:工作机制是:先从缓存中读取数据,如果没有再从慢速设备上读取实际数据(数据也会存入缓存):缓存什么:那些经常读取且不经常修改的数据/那些昂贵(CPU/IO)的且对于相同的请求有相同的计算结果的数据.如CPU--L1/L2--内存--磁盘就是一个典型的例子,CPU需要数据时先从L1/L2中读取,如果没有到内存中找,如果还没有会到磁盘上找.还有如用过Maven的朋友都应该知道,我们找依赖的时候,先从本机仓库找,再从本地服务器仓库找,最后到远程仓库服务器

Spring的配置详解

Spring的配置详解 3.1XML配置的结构 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www

Spring的资源详解

一.Spring的资源详解 1.1引言 在日常程序开发中,处理外部资源是很繁琐的事情,我们可能需要处理URL资源.File资源.ClassPath相关资源.服务器相关资源等等很多资源.因此处理这些资源需要使用不同的接口,这就增加了我们系统的复杂性:而且处理这些资源步骤都是类似的(打开资源.读取资源.关闭资源),因此如果能抽象出一个统一的接口来对这些底层资源进行统一访问,是不是很方便,而且使我们系统更加简洁,都是对不同的底层资源使用同一个接口进行访问. Spring提供一个Resource接口来统

Spring Boot异常处理详解

在Spring MVC异常处理详解中,介绍了Spring MVC的异常处理体系,本文将讲解在此基础上Spring Boot为我们做了哪些工作.下图列出了Spring Boot中跟MVC异常处理相关的类. Spring Boot在启动过程中会根据当前环境进行AutoConfiguration,其中跟MVC错误处理相关的配置内容,在ErrorMvcAutoConfiguration这个类中.以下会分块介绍这个类里面的配置. 在Servlet容器中添加了一个默认的错误页面 因为ErrorMvcAuto

(转)Spring JdbcTemplate 方法详解

Spring JdbcTemplate方法详解 文章来源:http://blog.csdn.net/dyllove98/article/details/7772463 JdbcTemplate主要提供以下五类方法: execute方法:可以用于执行任何SQL语句,一般用于执行DDL语句: update方法及batchUpdate方法:update方法用于执行新增.修改.删除等语句:batchUpdate方法用于执行批处理相关语句: query方法及queryForXXX方法:用于执行查询相关语句