ActiveMQ(06):ActiveMQ结合Spring开发

一、pom.xml与mq.properties

Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.1.7.RELEASE</version>
</dependency>

添加ActiveMQ的pool包,如下:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.11.1</version>
</dependency>

添加xbean的标签配置,如下:

<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>

pom.xml完整配置如下:

<properties>
    <activemq.version>5.9.0</activemq.version>
    <activemq-pool.version>5.11.1</activemq-pool.version>
    <spring.version>4.1.7.RELEASE</spring.version>
    <xbean.version>3.16</xbean.version>
    <commons-lang3.version>3.3.2</commons-lang3.version>
    <commons-io.version>2.4</commons-io.version>
    <commons-fileupload.version>1.3.1</commons-fileupload.version>
    <fasterxml.jackson.version>2.8.4</fasterxml.jackson.version>
    <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
	<version>${junit.version}</version>
    </dependency>
    <!-- Apache工具组件 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
	<version>${commons-lang3.version}</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
	<version>${commons-io.version}</version>
    </dependency>
    <dependency>
        <groupId>commons-fileupload</groupId>
	<artifactId>commons-fileupload</artifactId>
	<version>${commons-fileupload.version}</version>
    </dependency>
    <!-- jackson -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
	<version>${fasterxml.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-core</artifactId>
	<version>${fasterxml.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
	<artifactId>jackson-core-asl</artifactId>
        <version>${codehaus.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
	<artifactId>jackson-mapper-asl</artifactId>
	<version>${codehaus.jackson.version}</version>
    </dependency>
    <!-- activemq -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>${activemq-pool.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.xbean</groupId>
	<artifactId>xbean-spring</artifactId>
	<version>${xbean.version}</version>
    </dependency>
    <!-- spring -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework</groupId>
	<artifactId>spring-context-support</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-beans</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-webmvc</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-aspects</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-test</artifactId>
	<version>${spring.version}</version>
    </dependency>
</dependencies>

二、mq.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    
    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.userName}" password="${activemq.password}" />
    
    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    
    <!-- =======Spring JmsTemplate 的消息生产者【开始】======== -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
    <!-- =======Spring JmsTemplate 的消息生产者【结束】======== -->
    
    <!-- =======消息消费者=======【开始】 -->
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"/>
        <jms:listener destination="test.queue" ref="queueReceiver2"/>
    </jms:listener-container>

    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"/>
        <jms:listener destination="test.topic" ref="topicReceiver2"/>
    </jms:listener-container>
	<!-- =======消息消费者=======【结束】 -->
</beans>

三、java类

3.1 消费者监听器

3.1.1 队列消息监听器

package com.liuy.mq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * 队列消息监听器1
 * @description 队列消息监听器1
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:15:19
 */
@Component
public class QueueReceiver1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
	    System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }
}
package com.liuy.mq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * 队列消息监听器2
 * @description 队列消息监听器2
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:15:19
 */
@Component
public class QueueReceiver2 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
	    System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }
}

3.1.2 Topic消息监听器

package com.liuy.mq.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * Topic消息监听器1
 * @description Topic消息监听器1
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:17:11
 */
@Component
public class TopicReceiver1 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
	    System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }

}
package com.liuy.mq.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * Topic消息监听器2
 * @description Topic消息监听器2
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:17:11
 */
@Component
public class TopicReceiver2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
    	try {
            System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }

}

3.2 消息生产者

package com.liuy.mq.producer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * 队列消息生产者,发送消息到队列
 * @description 队列消息生产者,发送消息到队列
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:20:46
 */
@Component("queueSender")
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean

    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName,final String message){
    	jmsTemplate.send(queueName, new MessageCreator() {
    	    @Override
	    public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
	    }
        });
    }

}
package com.liuy.mq.producer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * Topic生产者发送消息到Topic
 * @description Topic生产者发送消息到Topic
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:20:46
 */
@Component("topicSender")
public class TopicSender {
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String topicName,final String message){
        jmsTemplate.send(topicName, new MessageCreator() {
	    @Override
	    public Message createMessage(Session session) throws JMSException {
	        return session.createTextMessage(message);
	    }
	});
    }

}

四、测试

package com.liuy.test.common;

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * 测试共公类
 * @description 测试共公类
 * @author liuy
 * @version V1.00
 * @date:2016年4月24日下午5:20:54
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:application-context.xml")
public class SpringJunitTest 
{

}
package com.liuy.test.core;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.liuy.mq.producer.queue.QueueSender;
import com.liuy.mq.producer.topic.TopicSender;
import com.liuy.test.common.SpringJunitTest;

/**
 * @description 描述
 * @author liuy
 * @version 1.0
 * @date:2017年4月11日下午9:00:18
 */
public class SpringQueueTest extends SpringJunitTest {
	@Autowired 
	private QueueSender queueSender;
	@Autowired 
	private TopicSender topicSender;

	/**
	 * 发送消息到队列
	 * Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
	 * @param message
	 * @return String
	 */
	@Test
	public void testQueueSend() throws Exception {
		queueSender.send("test.queue", "测试");
	}

	/**
	 * 发送消息到主题
	 * Topic主题 :放入一个消息,所有订阅者都会收到 
	 * 这个是主题目的地是一对多的
	 * @param message
	 * @return String
	 */
	@Test
	public void testTopicSend() throws Exception {
		topicSender.send("test.topic", "测试222");
	}
}

效果:

列队:

主题:

时间: 2024-10-22 16:12:51

ActiveMQ(06):ActiveMQ结合Spring开发的相关文章

利用Maven搭建Spring开发环境 【转】

一.   概要说明 最近几天在测试Spring3.0的AOP功能,在测试功能之前,首先是要搭建出Spring3.0的开发功能.开始去官网下载Spring的相关jar包,但是这些jar包中还是会需要其他的一些jar包,于是又手动的去下载其他的相关jar包.这样也可以搭建出开发环境,但是需要频繁的去下载缺少的jar包,很麻烦.这里,我们可以还有一个更好的办法,采用maven来管理我们的工程,让maven来自动为我们去下载相关版本的jar包,具体的配置如下. 二.   下载并安装maven 去网上下载

利用Maven搭建Spring开发环境

一.   概要说明 最近几天在测试Spring3.0的AOP功能,在测试功能之前,首先是要搭建出Spring3.0的开发功能.开始去官网下载Spring的相关jar包,但是这些jar包中还是会需要其他的一些jar包,于是又手动的去下载其他的相关jar包.这样也可以搭建出开发环境,但是需要频繁的去下载缺少的jar包,很麻烦.这里,我们可以还有一个更好的办法,采用maven来管理我们的工程,让maven来自动为我们去下载相关版本的jar包,具体的配置如下. 二.   下载并安装maven 去网上下载

基于Spring开发的DUBBO服务接口测试

基于Spring开发的DUBBO服务接口测试 知识共享主要内容: 1. Dubbo相关概念和架构,以及dubbo服务程序开发步骤. 2. 基于Spring开发框架的dubbo服务接口测试相关配置. 3. spring test+junit和spring test+TestNG两种测试框架脚本编写方法. 一.        DUBBO与DUBBO架构 1.          什么是dubbo?DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治

使用Spring开发第一个HelloWorld应用

http://www.importnew.com/13246.html 让我们用Spring来写第一个应用程序吧. 完成这一章要求: 熟悉Java语言 设置好Spring的环境 熟悉简单的Eclipse IDE的操作 如果你还没有设置好环境,请参考Spring开发环境的配置. 我们第一个程序是打印”Hello World”语句,这个语句通过Spring的配置文件来设置. 1 – 新建Java项目: 第一步用Eclipse IDE新建一个项目. 点击 > File > New > Java

学习spring2--跟我一起学Spring 3(3)–使用Spring开发第一个HelloWorld应用

http://www.importnew.com/13246.html 首页 所有文章 资讯 Web 架构 基础技术 书籍 教程 我要投稿 更多频道 » - 导航条 - 首页 所有文章 资讯 Web 架构 基础技术 书籍 教程 我要投稿 更多频道 » - iOS - Python - Android - Web前端 跟我一起学Spring 3(3)–使用Spring开发第一个HelloWorld应用 2014/10/10 | 分类: 教程 | 5 条评论 | 标签: SPRING, 教程 分享到

搭建Spring开发环境并编写第一个Spring小程序

一.前面,我写了一篇Spring框架的基础知识文章,里面没讲到如何配置Spring开发环境,今天就来讲一下,如果大家不知道怎么下载Spring软件包的话,可以看我那篇文章: http://blog.csdn.net/u012561176/article/details/45971917 ,里面讲述了2种获得Spring软件包的方式. 建议大家配置Spring环境之前先了解一下IoC(控制反转)的原理,可以看我写的文章:http://blog.csdn.net/u012561176/article

驱动开发读书笔记. 0.06 嵌入式linux视频开发之预备知识

驱动开发读书笔记. 0.06  嵌入式linux视频开发之预备知识 由于毕业设计选择了嵌入式linux视频开发相关的项目,于是找了相关的资料,下面是一下预备知识 UVC : UVC,全称为:USB video class 或USB video device class.是Microsoft与另外几家设备厂商联合推出的为USB视频捕获设备定义的协议标准,目前已成为USB org标准之一. UVC linux driver: UVC linux 驱动 需要在编译内核的时候选上 配置内核 Device

Spring开发环境搭建教程

Spring开发环境搭建 JDK7以上版本 eclispe for j2ee 4.0以上版本 Spring frameWorks 3.0以上版本 至于前两个我们就不介绍,直接百度就可以了,对于Spring FrameWork的下载链接比较难找. Spring frameWorks 3.0以上版本下载步骤 1.首先打开链接Spring官方网站 2.然后 点击最新版本号的Referrence链接进入 3. 选择Distribution Zip Files这一项. 4. 点击这个链接进入,进入真正的下

Spring 开发环境搭建

为了方面,直接使用eclipse,创建maven工程,创建成功之后 一.修改pom.xml,为了方面我就把Spring相关的jar包都引用了 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0

MyBatis集成Spring开发 讲解

MyBatis集成Spring开发 讲解 简介:Spring集成Mybatis开发简述有两种方式,第一种是在applicationContext.xml中配置接口扫描类(同时也扫描了sql.xml配置文件)或者注入接口类(MapperScannerConfigurer.MapperFactoryBean这两个在test中有讲解如何配置),第二种是原生的Mybatis,不用接口开发,而在applicationContext.xml中当配置sqlSessionFactory时候,配置如conf.xm