消息中间件activemq-5.13.0整合spring

首先说明这里是在qctivemq配置好并启动服务的情况下进行,请先自行配置好。也可关注我的博文(消息中间件qctivemq安全验证配置)进行配置。

1.首先看一下项目结构

2.所需jar包,这里只列出mq相关jar包,spring相关不与说明。

3.消息生产service  QueueMessageProducer

package cn.carowl.activemq;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
 * 消息生产者service
 * @author weishengbin
 *
 */
public class QueueMessageProducer {
    private JmsTemplate jmsTemplate;
    private Destination notifyQueue;
    private NotifyMessageConverter messageConverter;

    public void sendQueue(String noticeInfo) {
        sendMessage(noticeInfo);
    }

    private void sendMessage(final String noticeInfo) {
        this.jmsTemplate.send(notifyQueue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(noticeInfo);
            }

        });
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public Destination getNotifyQueue() {
        return notifyQueue;
    }

    public void setNotifyQueue(Destination notifyQueue) {
        this.notifyQueue = notifyQueue;
    }

    public NotifyMessageConverter getMessageConverter() {
        return messageConverter;
    }

    public void setMessageConverter(NotifyMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
}

4.消息转换NotifyMessageConverter

package cn.carowl.activemq;

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

/**
 * 消息转换
 * @author weishengbin
 *
 */
public class NotifyMessageConverter implements MessageConverter {
    private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);

    /**
     * 转换NoticeInfo对象到消息
     */
    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        System.out.println("sendMessage:"+object.toString());
        ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
        msg.setObject((Serializable) object);
        return msg;
    }

    /**
     * 转换接收到的消息为NoticeInfo对象
     */
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        if (logger.isDebugEnabled()) {
            logger.debug("Receive JMS message :" + message);
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage oMsg = (ObjectMessage) message;
            if (oMsg instanceof ActiveMQObjectMessage) {
                ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) oMsg;
                try {
                    Object object = aMsg.getObject();
                    String noticeInfo = object.toString();
                    return noticeInfo;
                } catch (Exception e) {
                    logger.error("Message:${} is not a instance of NoticeInfo." + message.toString());
                    throw new JMSException(
                            "Message:" + message.toString() + "is not a instance of NoticeInfo." + message.toString());
                }
            } else {
                logger.error("Message:${} is not a instance of ActiveMQObjectMessage." + message.toString());
                throw new JMSException("Message:" + message.toString() + "is not a instance of ActiveMQObjectMessage."
                        + message.toString());
            }
        } else {
            logger.error("Message:${} is not a instance of ObjectMessage." + message.toString());
            throw new JMSException(
                    "Message:" + message.toString() + "is not a instance of ObjectMessage." + message.toString());
        }
    }
}

5.监听接收消息QueueMessageListener

package cn.carowl.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 监听接收消息
 * @author weishengbin
 *
 */
public class QueueMessageListener  implements MessageListener {
    private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
    private NotifyMessageConverter messageConverter;

    /**
     * 接收消息
     */
    public void onMessage(Message message) {

        try {

            ObjectMessage textMessage = (ObjectMessage) message;
            String str = (String)messageConverter.fromMessage(textMessage);
            System.out.println("接收到的消息:"+str);

        } catch (Exception e) {
            logger.error("处理信息时发生异常", e);
        }
    }

    public NotifyMessageConverter getMessageConverter() {
        return messageConverter;
    }

    public void setMessageConverter(NotifyMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
}

6.消息生产者Sender

package cn.carowl.activemq;

import javax.servlet.ServletContext;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 消息生产者
 * @author weishengbin
 *
 */
@Service
public class Sender {
     private static ServletContext servletContext;
     private static BeanFactory factory;
     @Autowired
     private QueueMessageProducer notifyMessageProducer;

    /**
     * 发送点对点信息
     *
     * @param noticeInfo
     */
    public void setQueueSender(String obj) {
//        factory = new ClassPathXmlApplicationContext("classpath:/applicationContext-activemq.xml");
//        factory.getBean("queueMessageProducer");
//        QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) factory.getBean("queueMessageProducer"));
        notifyMessageProducer.sendQueue(obj);
    }

    public static ServletContext getServletContext() {
        return servletContext;
    }

    public static void setServletContext(ServletContext servletContext) {
        Sender.servletContext = servletContext;
    }

    public static BeanFactory getFactory() {
        return factory;
    }

    public static void setFactory(BeanFactory factory) {
        Sender.factory = factory;
    }

}

7.发送消息TestSend

package cn.carowl.activemq;

import javax.annotation.Resource;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequestMapping("/api")
public class TestSend {
    @Resource(name="sender")
    private Sender sender;

    @RequestMapping(value = "/send/test",
            method = RequestMethod.GET,
            produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    public void send() {
        sender.setQueueSender("发送的消息");
    }
}

8.activemq配置文件applicationContext-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">

    <context:component-scan base-package="cn.carowl.activemq">
        <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller" />
    </context:component-scan>

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://localhost:61616)" />
        <property name="closeTimeout" value="60000" />
        <property name="userName" value="admin" />
        <property name="password" value="admin" />
    </bean>
    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="connectinFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="10"></property>
    </bean>

    <!-- 配置消息发送目的地方式 -->
    <!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 -->
    <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="q.notify"></constructor-arg>
    </bean>
    <!-- 目的地:Topic主题 :放入一个消息,所有订阅者都会收到 -->
    <!--这个是主题目的地,一对多的 -->
    <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="t.notify"></constructor-arg>
    </bean>
    <!-- Spring JMS Template 配置JMS模版 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
    </bean>
    <!-- 使用Spring JmsTemplate 的消息生产者 -->
    <bean id="queueMessageProducer" class="cn.carowl.activemq.QueueMessageProducer">
        <property name="jmsTemplate" ref="jmsTemplate"></property>
        <property name="notifyQueue" ref="notifyQueue"></property>
        <property name="messageConverter" ref="messageConverter"></property>
    </bean>
<!--     <bean id="topicMessageProducer" class="cn.carowl.activemq.TopicMessageProducer">
        <property name="jmsTemplate" ref="jmsTemplate"></property>
        <property name="notifyTopic" ref="notifyTopic"></property>
        <property name="messageConverter" ref="messageConverter"></property>
    </bean> -->

    <!-- 消息消费者 一般使用spring的MDP异步接收Queue模式 -->
    <!-- 消息监听容器 -->
    <bean id="queueContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectinFactory"></property>
        <property name="destination" ref="notifyQueue"></property>
        <property name="messageListener" ref="queueMessageListener"></property>
    </bean>

<!--     消息监听容器
    <bean id="topicContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectinFactory"></property>
        <property name="destination" ref="notifyTopic"></property>
        <property name="messageListener" ref="topicMessageListener"></property>
        <property name="pubSubDomain" value="true" />
    </bean> -->

    <!-- 异步接收消息处理类 -->
    <bean id="queueMessageListener" class="cn.carowl.activemq.QueueMessageListener">
        <property name="messageConverter" ref="messageConverter"></property>
    </bean>
<!--     <bean id="topicMessageListener" class="cn.carowl.activemq.TopicMessageListener">
        <property name="messageConverter" ref="messageConverter"></property>
    </bean> -->
    <bean id="messageConverter" class="cn.carowl.activemq.NotifyMessageConverter">
    </bean>

</beans>

9.配置前端控制器web.xml

    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>
            classpath*:/applicationContext.xml,
            classpath*:/applicationContext-shiro.xml,
            classpath*:/applicationContext-activemq.xml
        </param-value>
    </context-param>

    <servlet>
        <servlet-name>springmvc</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath*:/applicationContext-mvc.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet-mapping>
        <servlet-name>springmvc</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

10.启动服务,浏览器访问接口路径,测试发送消息是否成功!

-->

-->

-->

时间: 2024-08-03 09:39:52

消息中间件activemq-5.13.0整合spring的相关文章

ActiveMQ入门案例以及整合Spring的简单实用

先来个ActiveMQ介绍哈: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,是一个消息中间件. 应用场景:为了实现系统之间的通信,把系统之间的调用耦合度降低就可以使用MQ. 1) activeMQ 是Apache出品,最流行的,能力强劲的开源消息总线. 2) avtiveMQ主要特点:完全支持JMS1.1和J2EE 1.4规范:支持spring,很容易内嵌到spring中:支持ajax. 3) activeMQ的消息形式: a) 点对点形式,即生产

ActiveMQ整合Spring队列和话题的使用

ActiveMQ的作用分析: 1. 解决服务之间耦合 2. 使用消息队列,增加系统并发处理量 ActiveMQ 应用场景分析 1. 用户注册,重点用户信息数据库保存,发短信.发邮件,增加业务处理复杂度,这时候使用 MQ, 将发短信.发邮箱,通知 MQ,由另外服务平台完成 2. 搜索平台.缓存平台  查询数据,建立缓存.索引 ,不从数据库查询,从缓存或者索引库查询 当增加.修改.删除数据时,发送消息给 MQ, 缓存平台.索引平台 从 MQ 获取到这个信息,更新缓存或者索引 Spring整和Acti

JAVAEE——宜立方商城09:Activemq整合spring的应用场景、添加商品同步索引库、商品详情页面动态展示与使用缓存

1. 学习计划 1.Activemq整合spring的应用场景 2.添加商品同步索引库 3.商品详情页面动态展示 4.展示详情页面使用缓存 2. Activemq整合spring 2.1. 使用方法 第一步:引用相关的jar包. <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> &l

laravel (5.1) & Ember.js (1.13.0) 的整合

Lavavel 不必过多介绍了, 作为全世界最流行的PHP框架,有着清晰的架构.完善的文档.丰富的工具等等,能够帮助开发者快速构建多页面web应用程序. 然而,随着技术的发展,web程序的另一面--客户端,正在变得越来越多元(PC,手机,平板,其他专用设备等).所以需要一种统一的机制,方便服务器与不同的设备进行通信.Restful API 就是基于这个思想被提出来的. 阮一峰给出了对Restful架构的总结: 每一个URI代表一种资源: 客户端和服务器之间,传递这种资源的某种表现层: 客户端通过

spring boot 2.0 整合 elasticsearch NoNodeAvailableException

原文地址:spring boot 2.0 整合 elasticsearch NoNodeAvailableException 原文说的有点问题,下面贴出我的配置: 码云项目地址:https://gitee.com/11230595/springboot-elasticsearch elasticsearch.yml cluster.name: my-applicationnetwork.host: 0.0.0.0 http.port: 9200transport.tcp.port: 9300tr

Mybatis整合Spring 【转】

根据官方的说法,在ibatis3,也就是Mybatis3问世之前,Spring3的开发工作就已经完成了,所以Spring3中还是没有对Mybatis3的支持.因此由Mybatis社区自己开发了一个Mybatis-Spring用来满足Mybatis用户整合Spring的需求.下面就将通过Mybatis-Spring来整合Mybatis跟Spring的用法做一个简单的介绍. MapperFactoryBean 首先,我们需要从Mybatis官网上下载Mybatis-Spring的jar包添加到我们项

NVIDIA DIGITS 学习笔记(NVIDIA DIGITS-2.0 + Ubuntu 14.04 + CUDA 7.0 + cuDNN 7.0 + Caffe 0.13.0)

转自:http://blog.csdn.net/enjoyyl/article/details/47397505?from=timeline&isappinstalled=0#10006-weixin-1-52626-6b3bffd01fdde4900130bc5a2751b6d1 NVIDIA DIGITS-2.0 + Ubuntu 14.04 + CUDA 7.0 + cuDNN 7.0 + Caffe 0.13.0环境配置 引言 DIGITS简介 DIGITS特性 资源信息 说明 DIGI

整合 Spring + SpringMVC + MyBatis

< 一 > POM 配置文件 ( 如果出现 JAR 包 引入错误, 请自行下载 ) <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.

【Java EE 学习第81天】【CXF框架】【CXF整合Spring】

一.CXF简介 CXF是Apache公司下的项目,CXF=Celtix+Xfire:它支持soap1.1.soap1.2,而且能够和spring进行快速无缝整合. 另外jax-ws是Sun公司发布的一套开发WebService服务的标准.早期的标准如jax-rpc已经很少使用,而cxf就是在新标准jax-ws下开发出来的WebService,jax-ws也内置到了jdk1.6当中. CXF官方下载地址:http://cxf.apache.org/download.html 下载完成之后,解压开压