Spring整合activeMQ消息队列

1.配置JMS

  <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>  

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>  

发送信息到activeMQ

@Override
    public void addNotifyCashToMq(final String notifyUrl, final String cashId, final String reqSn, final String callResult,int count) {
        //发送的参数final String callBackUrl = SuperAppConstant.TRANSACTION_CALLBACK_PREFIX_URL + notify_url_notifyCash
                    + notifyUrl + "&cashId=" + cashId + "&reqSn=" + reqSn + "&callResult=" + callResult + "&count="
                    + _count;        //发送消息到queue_notifuCash_serial消息队列
            jmsTemplate.send(queue_notifyCash_serial, new MessageCreator() {

                @Override
                public Message createMessage(Session session) throws JMSException {
                    if (logger.isDebugEnabled()) {
                        logger.debug("notifyUrl=" + notifyUrl + ",cashId=" + cashId + ",reqSn=" + reqSn + ",callResult="
                                + callResult + ",_count=" + _count);
                    }
                    HashMap map = new HashMap();
                    map.put("callBackUrl", callBackUrl);
                    ObjectMessage objectMessage = session.createObjectMessage();//创建消息
                    objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);//延时
                    return objectMessage;
                }
            });
    }

xml配置信息

    <!-- ActiveMQ 连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker_url}" />
    </bean>

    <!-- Spring Caching 连接工厂 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory" />
        <property name="sessionCacheSize" value="10" />
    </bean>

    <!-- Spring JMS Template -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
    </bean>

2.destination消息队列定义

<description>Queue定义</description>
    <bean id="queue_callback_serial" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue_callback_serial</value>
        </constructor-arg>
    </bean>

3。监听器BatchJob

3.1 jms.xml

<description>JMS简单应用配置</description>

    <!-- ActiveMQ 连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker_url}" />
    </bean>

    <!-- Spring Caching 连接工厂 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory" />
        <property name="sessionCacheSize" value="10" />
    </bean>

    <!-- Queue定义 -->
    <bean id="orderQueueProducer" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="order.queue.producer" />
    </bean>
    <!-- Spring JMS Template -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="defaultDestination" ref="orderQueueProducer" />
    </bean>

    <!-- 使用Spring JmsTemplate的消息生产者 -->
    <bean id="orderProducerJmsService" class="com.gmall88.server.jms.order.impl.OrderProducerJmsServiceImpl">
        <property name="jmsTemplate" ref="jmsTemplate" />
    </bean>

    <!-- 定义消息队列 -->
    <bean id="orderQueueListener" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>order.queue.listener</value>
        </constructor-arg>
    </bean>

3.2 监听器impl

import java.util.Map;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.gmall88.server.wxpay.RF;

import net.sf.json.JSONObject;

public class NotifyCashManagerImpl implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(Message message) {
        if(logger.isDebugEnabled()){
            logger.debug("new callback start..");
        }
        if(message !=null){
            if(message instanceof ObjectMessage){
                ObjectMessage objectMessage = (ObjectMessage) message;//监听消息
                try {
                    Map param  = (Map)objectMessage.getObject();
                    String callBackUrl = (String)param.get("callBackUrl");//取出消息里的参数
                    if (logger.isInfoEnabled()) {
                        logger.info("callBackUrl=" + callBackUrl);
                    }
                    JSONObject jsonObject = RF.httpsRequestJson(callBackUrl, "POST", "");//通过http回调方法
                    if(jsonObject != null){
                        logger.info("code:"+jsonObject.getString("code"));
                        logger.info("message="+jsonObject.getString("message"));
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                }
            }else{
                logger.error("Unknown message, type=" + message.getClass().getName());
            }
        }else{
            logger.error("message is null");
        }
    }

}

回调方法:

@RequestMapping(value = "/notifyCash", method = RequestMethod.POST)
    @ResponseBody
    public Object notifyCash(String notifyUrl, String cashId, String reqSn, String cashResult,int count) {
        ReturnResult returnResult = new ReturnResult();
        String clientId = "superApp_notifyOrder";
        try {
            clientId += cashId;
            returnResult = recordRequestCheck(clientId);
            if(returnResult != null){
                return returnResult;
            }
            returnResult = new ReturnResult();

            try{
                    // 回调业务系统
                    try {
                        superAppServerManager.notifyCash(notifyUrl, cashId, reqSn, cashResult);
                    } catch (Exception e) {
                        // 回调失败,做延时回调
                        logger.error(e.getMessage(), e);
                        superAppServerManager.addNotifyCashToMq(notifyUrl, cashId, reqSn, cashResult, count);
                }
            }finally{
                recordRequestEnd(clientId);
            }

        } catch (GmallException e) {
            returnResult.setCodeNum(e.getCode());
            returnResult.setMessage(e.getMessage());
        }  catch (Exception e) {
            logger.error(e.getMessage(), e);
            returnResult.setCode(ReturnCodeType.FAILURE)
                    .setMessage(e.getMessage());
        }
        logger.info("called..");
        return returnResult;
    }

整理了一下整个流程如图所示:

时间: 2024-08-04 14:57:59

Spring整合activeMQ消息队列的相关文章

Spring整合ActiveMQ及多个Queue消息监听的配置

消息队列(MQ)越来越火,在java开发的项目也属于比较常见的技术,MQ的相关使用也成java开发人员必备的技能.笔者公司采用的MQ是ActiveMQ,且消息都是用的点对点的模式.本文记录了实现Spring整合ActivateMQ的全过程及如何使用MQ,便于后续查阅. 一.项目的搭建 采用maven构建项目,免去了copy jar包的麻烦.因此,我们创建了一个java类型的Maven Project (1)项目结构图 先把项目结构图看一下,便于对项目的理解. (2)pom.xml 我们需要加入以

JAVA的设计模式之观察者模式----结合ActiveMQ消息队列说明

1----------------------观察者模式------------------------------ 观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新. activeMQ消息队列 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮

Spring整合ActiveMQ

1.管理ActiveMQ 地址  http://localhost:8161/admin/ 默认用户和密码:admin=admin 运行发送者,eclipse控制台输出,如下图: 此时,我们先看一下ActiveMQ服务器,Queues内容如下: 我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息,如下图: 如果这些队列中的消息,被删除,消费者则无法消费. 我们继续运行一下消费者,eclipse控制台打印消息,如下: 

Spring整合ActiveMQ测试

第一部分:创建项目(使用maven) --注意:使用IDEA创建maven普通项目还是聚合项目,都可以不用勾选,直接点next. 然后填入坐标和模块的名字   然后点击左上角的+号,选择web.   在新打开的页面下会显示web选项,这里的路径改为\src\main\webapp ,再修改web.xml文件的路径.   IDEA不会pom.xml文件默认生成jar文件,要在pom.xml添加<packaging>war</packaging>构建时生成war文件. 第二步:导入po

Java消息队列-Spring整合ActiveMq

1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 优势:异步.可靠 消息模型:点对点,发布/订阅 JMS中的对象  然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 多种语言和协议编写客户端.语言: Java

实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)

引言: 最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情.最后研究说用消息队列,当有需

2015年12月10日 spring初级知识讲解(三)Spring消息之activeMQ消息队列

基础 JMS消息 一.下载ActiveMQ并安装 地址:http://activemq.apache.org/ 最新版本:5.13.0 下载完后解压缩到本地硬盘中,解压目录中activemq-core-5.13.0.jar,这就是ActiveMQ提供给我们的API. 在bin目录中,找到用于启动ActiveMQ的脚本,运行脚本后ActiveMQ就准备好了,可以使用它进行消息代理. 访问http://127.0.0.1:8161/admin/能看到如下则表示安装成功了. 二.在Spring中搭建消

ActiveMQ消息队列的搭建

今天来写下消息队列 一.首先介绍下什么是activeMQ? ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 主要特点: 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Sto

spring 整合 ActiveMQ

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2