使用spring + ActiveMQ 总结

Spring 整合JMS 基于ActiveMQ 实现消息的发送接收

看了网上很多文件,最后总结出了自己需要的。

一、下载并安装ActiveMQ

首先我们到apache官网上下载activeMQ(http://activemq.apache.org/download.html),进行解压后运行其bin目录下面的activemq.bat文件启动activeMQ。

二、Spring中加入ActiveMQ的配置

首先将相关的jar拷贝到项目的lib文件下

配置之前先看一下相关目录以便于理解

下面开始配置

<!-- ActiveMQ 连接工厂 -->
 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
 <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <!-- <property name="brokerURL" value="tcp://192.168.1.79:61616" /> -->
  <property name="brokerURL" value="${mqUrl}" />
 </bean>
 <!-- Spring Caching连接工厂 -->
 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 
 <bean id="cachingConnectionFactory"
  class="org.springframework.jms.connection.CachingConnectionFactory">
  <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 
  <property name="targetConnectionFactory" ref="connectinFactory"></property>
  <!-- Session缓存数量 -->
  <property name="sessionCacheSize" value="10"></property>
 </bean>

<!-- 配置消息发送目的地方式 -->
 <!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 -->

<bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
  <constructor-arg value="q.notify"></constructor-arg>
 </bean>
 <!-- 目的地:Topic主题 :放入一个消息,所有订阅者都会收到 -->
 <!--这个是主题目的地,一对多的--> 
 <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg value="t.notify"></constructor-arg>
 </bean>
 <!-- Spring JMS Template 配置JMS模版 -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
 </bean>
 <!-- 使用Spring JmsTemplate 的消息生产者 -->
 <bean id="queueMessageProducer" class="com.common.jms.QueueMessageProducer">
  <property name="jmsTemplate" ref="jmsTemplate"></property>
  <property name="notifyQueue" ref="notifyQueue"></property>
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <bean id="topicMessageProducer" class="com.common.jms.TopicMessageProducer">
  <property name="jmsTemplate" ref="jmsTemplate"></property>
  <property name="notifyTopic" ref="notifyTopic"></property>
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <!-- 消息消费者 一般使用spring的MDP异步接收Queue模式 -->
 <!-- 消息监听容器 -->
 <bean id="queueContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectinFactory"></property>
  <property name="destination" ref="notifyQueue"></property>
  <property name="messageListener" ref="queueMessageListener"></property>
 </bean>
 <!-- 消息监听容器 -->
 <bean id="topicContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectinFactory"></property>
  <property name="destination" ref="notifyTopic"></property>
  <property name="messageListener" ref="topicMessageListener"></property>
  <!-- 发布订阅模式 -->
  <property name="pubSubDomain" value="true" />

</bean>
 <!-- 异步接收消息处理类 -->
 <bean id="queueMessageListener" class="com.common.jms.QueueMessageListener">
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <bean id="topicMessageListener" class="com.common.jms.TopicMessageListener">
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <bean id="messageConverter" class="com.common.jms.NotifyMessageConverter">
 </bean>

下面展示一下Sender

public class Sender {
 private static ServletContext servletContext;
 private static WebApplicationContext ctx; 
 /**
  * 发送点对点信息
  * @param noticeInfo
  */
 public static void setQueueSender(){ 
  servletContext = ServletActionContext.getServletContext();
  ctx = WebApplicationContextUtils.getWebApplicationContext(servletContext);
   QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) ctx.getBean("queueMessageProducer"));
   PhoneNoticeInfo noticeInfo = new PhoneNoticeInfo();

(下面先展示PhoneNoticeInfo 然后是 QueueMessageProducer )
   noticeInfo.setNoticeContent("Hello Word");
   noticeInfo.setNoticeTitle("hello Word");
   noticeInfo.setReceiver("hello");
   noticeInfo.setReceiverPhone("1111111");
   notifyMessageProducer.sendQueue(noticeInfo);
  }

public static ServletContext getServletContext() {
  return servletContext;
 }
 public static void setServletContext(ServletContext servletContext) {
  Sender.servletContext = servletContext;
 }
 public static WebApplicationContext getCtx() {
  return ctx;
 }
 public static void setCtx(WebApplicationContext ctx) {
  Sender.ctx = ctx;
 } 
}

PhoneNoticeInfo

public class PhoneNoticeInfo implements Serializable {
 /** 消息标题 */
 public String noticeTitle;
 /** 消息内容 */
 public String noticeContent;
 /** 接收者 */
 public String receiver;
 /** 接收手机号 */
 public String receiverPhone;
 public String getNoticeTitle() {
  return noticeTitle;
 }
 public void setNoticeTitle(String noticeTitle) {
  this.noticeTitle = noticeTitle;
 }
 public String getNoticeContent() {
  return noticeContent;
 }
 public void setNoticeContent(String noticeContent) {
  this.noticeContent = noticeContent;
 }
 public String getReceiver() {
  return receiver;
 }
 public void setReceiver(String receiver) {
  this.receiver = receiver;
 }

public String getReceiverPhone() {
  return receiverPhone;
 }
 public void setReceiverPhone(String receiverPhone) {
  this.receiverPhone = receiverPhone;
 }
 
}

QueueMessageProducer

/**
 * 消息生产者服务类
 */
public class QueueMessageProducer {
 private JmsTemplate jmsTemplate;
 private Destination notifyQueue;
 private NotifyMessageConverter messageConverter;
 public void sendQueue(PhoneNoticeInfo noticeInfo){
  sendMessage(noticeInfo);
 }
 private void sendMessage(PhoneNoticeInfo noticeInfo) {
  // TODO Auto-generated method stub
  jmsTemplate.setMessageConverter(messageConverter);
  jmsTemplate.setPubSubDomain(false);
  jmsTemplate.convertAndSend(notifyQueue,noticeInfo);
 }
 public JmsTemplate getJmsTemplate() {
  return jmsTemplate;
 }
 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }
 public Destination getNotifyQueue() {
  return notifyQueue;
 }
 public void setNotifyQueue(Destination notifyQueue) {
  this.notifyQueue = notifyQueue;
 }
 public NotifyMessageConverter getMessageConverter() {
  return messageConverter;
 }
 public void setMessageConverter(NotifyMessageConverter messageConverter) {
  this.messageConverter = messageConverter;
 }
}

NotifyMessageConverter

/**
 * 消息转换
 */
public class NotifyMessageConverter implements MessageConverter {
 private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);
 @Override
 /**
  * 转换接收到的消息为NoticeInfo对象
  */
 public Object fromMessage(Message message) throws JMSException,
   MessageConversionException {
  // TODO Auto-generated method stub
  if (logger.isDebugEnabled()) {
   logger.debug("Receive JMS message :"+message);
  }
  if (message instanceof ObjectMessage) {
   ObjectMessage oMsg = (ObjectMessage)message;
   if (oMsg instanceof ActiveMQObjectMessage) {
    ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)oMsg;
    try {
     PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)aMsg.getObject();
     return noticeInfo;
    } catch (Exception e) {
     // TODO: handle exception
     logger.error("Message:${} is not a instance of NoticeInfo."+message.toString());
     throw new JMSException("Message:"+message.toString()+"is not a instance of NoticeInfo."+message.toString());
    }
   }else{
    logger.error("Message:${} is not a instance of ActiveMQObjectMessage."+message.toString());
    throw new JMSException("Message:"+message.toString()+"is not a instance of ActiveMQObjectMessage."+message.toString());
   }
  }else {
   logger.error("Message:${} is not a instance of ObjectMessage."+message.toString());
   throw new JMSException("Message:"+message.toString()+"is not a instance of ObjectMessage."+message.toString());
  }
 }

@Override
 /**
  * 转换NoticeInfo对象到消息
  */
 public Message toMessage(Object obj, Session session) throws JMSException,
   MessageConversionException {
  // TODO Auto-generated method stub
  if (logger.isDebugEnabled()) {
   logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
  }
  if (obj instanceof PhoneNoticeInfo) {
   ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage();
   msg.setObject((PhoneNoticeInfo)obj);
   return msg;
  }else {
   logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
  }
  return null;
 }

}

QueueMessageListener

public class QueueMessageListener implements MessageListener {
 private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
 private NotifyMessageConverter messageConverter;
 
 /**
  * 接收消息
  */
 @Override
 public void onMessage(Message message) {
  // TODO Auto-generated method stub
  try {
   ObjectMessage objectMessage = (ObjectMessage)message;
   PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)messageConverter.fromMessage(objectMessage);
   System.out.println("queue收到消息"+noticeInfo.getNoticeContent());
   System.out.println("model:"+objectMessage.getJMSDeliveryMode()); 
   System.out.println("destination:"+objectMessage.getJMSDestination()); 
   System.out.println("type:"+objectMessage.getJMSType()); 
   System.out.println("messageId:"+objectMessage.getJMSMessageID()); 
   System.out.println("time:"+objectMessage.getJMSTimestamp()); 
   System.out.println("expiredTime:"+objectMessage.getJMSExpiration()); 
   System.out.println("priority:"+objectMessage.getJMSPriority());

} catch (Exception e) {
   // TODO: handle exception
   logger.error("处理信息时发生异常",e);
  }
 }
 public NotifyMessageConverter getMessageConverter() {
  return messageConverter;
 }
 public void setMessageConverter(NotifyMessageConverter messageConverter) {
  this.messageConverter = messageConverter;
 }

}

时间: 2024-12-20 06:05:46

使用spring + ActiveMQ 总结的相关文章

spring+activemq中多个consumer同时处理消息时遇到的性能问题

最近在做数据对接的工作,用到了activemq,我需要从activemq中接收消息并处理,但是我处理数据的步骤稍微复杂,渐渐的消息队列中堆的数据越来越多,就想到了我这边多开几个线程来处理消息. 可是会发现,服务器占用的网络带宽变的异常的高,仔细分析发现,mq入队时并没有异常高的网络流量,仅仅在出队时会产生很高的网络流量.最终发现是spring的jmsTemplate与activemq的prefetch机制配合导致的问题.研究源码发现jmsTemplate实现机制是:每次调用receive()时都

Spring ActiveMQ Caused By: javax.jms.IllegalStateException: Connection closed

根据 http://www.cnblogs.com/yshyee/p/7448808.html 进行JMS操作时,发送跟监听放到不同的项目中进行时,出现以下异常信息: org.springframework.jms.IllegalStateException: Connection closed; nested exception is javax.jms.IllegalStateException: Connection closed at org.springframework.jms.su

Spring Boot集成ActiveMQ

在Spring Boot中集成ActiveMQ相对还是比较简单的,都不需要安装什么服务,默认使用内存的activeMQ,当然配合ActiveMQ Server会更好.在这里我们简单介绍怎么使用,本节主要分以下几个步骤: (1) 新建Maven Java Project; (2) 在pom.xml引入依赖: (3) 编码测试 (4) 配置信息 接下来看看各个步骤的操作: (1) 新建Maven Java Project; 新建一个工程取名为spring-boot-activemq (2) 在pom

spring boot整合JMS(ActiveMQ实现)

一.安装ActiveMQ 具体的安装步骤,请参考我的另一篇博文: http://blog.csdn.net/liuchuanhong1/article/details/52057711 二.新建spring boot工程,并加入JMS(ActiveMQ)依赖 三.工程结构 pom依赖如下: [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

activeMQ入门+spring boot整合activeMQ

最近想要学习MOM(消息中间件:Message Oriented Middleware),就从比较基础的activeMQ学起,rabbitMQ.zeroMQ.rocketMQ.Kafka等后续再去学习. 上面说activeMQ是一种消息中间件,可是为什么要使用activeMQ? 在没有使用JMS的时候,很多应用会出现同步通信(客户端发起请求后需要等待服务端返回结果才能继续执行).客户端服务端耦合.单一点对点(P2P)通信的问题,JMS可以通过面向消息中间件的方式很好的解决了上面的问题. JMS规

JAVAEE——宜立方商城09:Activemq整合spring的应用场景、添加商品同步索引库、商品详情页面动态展示与使用缓存

1. 学习计划 1.Activemq整合spring的应用场景 2.添加商品同步索引库 3.商品详情页面动态展示 4.展示详情页面使用缓存 2. Activemq整合spring 2.1. 使用方法 第一步:引用相关的jar包. <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> &l

Spring Boot入门 and Spring Boot与ActiveMQ整合

1.Spring Boot入门 1.1什么是Spring Boot Spring 诞生时是 Java 企业版(Java Enterprise Edition,JEE,也称 J2EE)的轻量级代替品.无需开发重量级的 Enterprise JavaBean(EJB),Spring 为企业级Java 开发提供了一种相对简单的方法,通过依赖注入和面向切面编程,用简单的Java 对象(Plain Old Java Object,POJO)实现了 EJB 的功能. 虽然 Spring 的组件代码是轻量级的

Spring boot 集成ActiveMQ(包含双向队列实现)

集百家之长,成一家之言.  1. 下载ActiveMQ https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.9/apache-activemq-5.15.9-bin.zip 2.新建 Maven 项目 activemq 3.pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001

Spring Boot 整合 ActiveMQ

依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--消息队列连接池--> <dependency> <groupId>org.apache.activemq</groupId>