ActiveMQ与spring集成实现Queue模式

  ActiveMQ可以和spring很好的集成,下面我们来看看,如何做个集成的demo。

  (1)pom.xml引入相关jar

<!-- spring相关 begin -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.5.RELEASE</version>
        </dependency>
        <!-- spring相关 end -->
        <!-- activeMQ相关 begin-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.4.RELEASE</version>
        </dependency>
        <!-- activeMQ相关  end-->

  (2)添加生产者配置activemq-sender.xml

<description>JMS发布者应用配置</description>

    <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="20" />
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <!-- MQ地址 账户名 密码-->
                <property name="brokerURL" value="tcp://192.168.56.129:61616" />
                <property name="userName" value="parry" />
                <property name="password" value="parry123" />
                <!-- 是否异步发送 -->
                <property name="useAsyncSend" value="true"/>
            </bean>
        </property>
    </bean>

    <!-- 接收消息的目的地(一个主题)点对点队列 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息主题的名字 -->
        <constructor-arg index="0" value="messages" />
    </bean>

    <!-- 接收配置JMS模版 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="defaultDestination" ref="destination" />
        <!-- value为true为发布/订阅模式; value为false为点对点模式-->
        <property name="pubSubDomain" value="false"/>
    </bean>

  (3)添加消费者配置activemq-consumer.xml

<description>JMS订阅者应用配置</description>
    <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="20" />
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <!-- MQ地址 账户名 密码-->
                <property name="brokerURL" value="tcp://192.168.56.129:61616" />
                <property name="userName" value="parry" />
                <property name="password" value="parry123" />
                <!-- 是否异步发送 -->
                <property name="useAsyncSend" value="true"/>
            </bean>
        </property>
    </bean>

    <!-- 接收消息的目的地(一个主题)点对点队列 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息主题的名字 -->
        <constructor-arg index="0" value="messages" />
    </bean>

    <!-- 消费者配置 (自己定义) -->
    <bean id="consumer" class="com.parry.MQ.funcion.Listener" />

    <!-- 消息监听容器 -->
    <bean id="myListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="destination" ref="destination" />
        <property name="messageListener" ref="consumer" />
        <!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 -->
        <property name="taskExecutor" ref="queueMessageExecutor"/>
        <!-- 设置固定的线程数 -->
        <property name="concurrentConsumers" value="30"/>
        <!-- 设置动态的线程数 -->
        <property name="concurrency" value="20-50"/>
        <!-- 设置最大的线程数 -->
        <property name="maxConcurrentConsumers" value="80"/>
    </bean>
    <bean id="queueMessageExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="30" />
        <property name="maxPoolSize" value="80" />
        <property name="daemon" value="true" />
        <property name="keepAliveSeconds" value="120" />
    </bean>

  (4)新建一个发送消息的方法

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
 * 发送消息
 * @author Administrator
 *
 */
@Component
public class QueueSender {

    @Autowired
    private JmsTemplate myJmsTemplate;

    /**
     * 发送一条消息到指定的队列(目标)
     *
     * @param queueName
     *            队列名称
     * @param message
     *            消息内容
     */
    public void send(String queueName, final String message) {
        myJmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

  (5)添加监听器

package com.parry.MQ.funcion;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 接收者监听类
 * @author Administrator
 *
 */
public class Listener implements MessageListener {

    public void onMessage(Message message) {
        // 业务处理
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println("接收到信息:" + message2.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  (6)写个一请求测试一下

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.parry.MQ.funcion.QueueSender;

@Controller
public class App {

    @Autowired
    private QueueSender sender;

    @RequestMapping("test")
    @ResponseBody
    public String Test() {

        sender.send("messages", "你好,这是我的第一条消息!");
        return "Hello world";
    }
}

  (7)测试结果

  

  在ActiveMQ的管理后台,我们也能看到我们的消息(这里我多测试了几次):

  

时间: 2024-11-06 01:00:22

ActiveMQ与spring集成实现Queue模式的相关文章

Spring 集成Redis哨兵模式

Spring 集成Redis哨兵模式 1.pom文件添加以下jar <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>1.7.2.RELEASE</version> </dependency> <!-- redis客户端jar -->

ActiveMQ与Spring集成

信息发送者:HelloSender.java package edu.sjtu.erplab.springactivemq; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.Destination; import javax.jms.Message; import org.springframework.context.ApplicationContext; import org.springfr

ActiveMQ(二)与Spring集成

声明 转载请注明出处! Reprint please indicate the source! http://www.hiknowledge.top/2017/04/03/activemq%ef%bc%88%e4%ba%8c%ef%bc%89%e4%b8%8espring%e9%9b%86%e6%88%90/ ActiveMQ与Spring集成 由于历史原因,JMS有4个版本.Spring提供了用于简化JMS API使用的抽象框架,并且对用户屏蔽了JMS API中1.0.2和1.1版本的差异.

Spring集成ActiveMQ配置 --转

转自:http://suhuanzheng7784877.iteye.com/blog/969865 集成环境 Spring采用2.5.6版本,ActiveMQ使用的是5.4.2,从apache站点可以下载.本文是将Spring集成ActiveMQ来发送和接收JMS消息. 集成步骤 将下载的ActiveMQ解压缩后文件夹如下 activemq-all-5.4.2.jar是activemq的所有的类jar包.lib下面是模块分解后的jar包.将lib下面的 Java代码 /lib/activati

Spring集成Activemq使用(未完待续)

现在任何一个框架的使用都会结合spring框架,quartz.cxf与平时常见的Hibernate.mybatis.Struts等都可以与spring集成起来使用,在这里研究了activemq结合spring的使用方法. 1.理论篇 spring集成JMS连接ActiveMq ConnectionFactory:用于管理连接的工厂(Spring为我们提供的连接池,因为JmsTemplate每次发消息都会重新创建连接.会话和producer,这个操作非常消耗性能,所以Spring提供了连接池) s

MQ原理、使用场景、IBM WebSphere MQ介绍及spring集成配置

一.MQ简介及特点 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WebSphere MQ.RabbitMQ .ZeroMQ

RabbitMQ安装和使用(和Spring集成)

一.安装Rabbit MQ Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang.通过下面两个连接下载安装3.2.3 版本: 下载并安装 Eralng OTP For Windows (vR16B03) 运行安装 Rabbit MQ Server Windows Installer (v3.2.3) 具体操作步骤参考:在 Windows 上安装Rabbit MQ 指南 本人遇到的问题 当安装RabbitMQ后,使用rabbitmqctl

rabbitMQ第五篇:Spring集成RabbitMQ

前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq. 首先引入配置文件org.springframework.amqp,如下 <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> <

ActiveMQ(06):ActiveMQ结合Spring开发

一.pom.xml与mq.properties Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下: <dependency>     <groupId>org.springframework</groupId>     <artifactId>spring-jms</artifactId>     <version>4.1.7.RELEASE</version> </dependency&g