activitemq整合spring

activitemq整合spring

一.activmq的点对点模型

pom.xml:
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo</groupId>
    <artifactId>aq-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>

    <name>aq-test Maven Webapp</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>jms-api</artifactId>
            <version>1.1-rev-1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.34</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.1.1</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>aq-test</finalName>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-war-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>
ActiviteMq.class:(发送端)
package com.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class ActiviteMq {

    @Test
    public void testQueueProducer() throws JMSException {
        //1.创建connectinfactory对象,需要指定服务的IP以及端口号
        //brokerURL服务器的ip以及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        //2.使用ConnectionFactory创建
        Connection connection = connectionFactory.createConnection();

        //3.开启链接,调用connection对象的start的方法
        connection.start();

        //4.使用connection对创建一个session对象
        //[4.1] 第一参数:是否开启事务
        //[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式
        //1.自动应答2.手动应答 一般为自动

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象
        //参数:队列名称

        Queue queue = session.createQueue("test-queue2");

        //第六步 使用session创建一个producer对象
        MessageProducer producer = session.createProducer(queue);

        //第七步 创建一个message对象 创建一个textmessage对象
        TextMessage textMessage = session.createTextMessage("风风光光");

        //第八步 使用producer对象发送消息
        producer.send(textMessage);

        //第九步 关闭资源
         producer.close();
         session.close();
         connection.close();

    }
}
ReceiveMsf.class:(接收端)
package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;
import java.io.IOException;

public class ReceiveMsf {

    @Test
    public void testQueueConsumer() throws JMSException, IOException {

        //1.创建connectinfactory对象,需要指定服务的IP以及端口号
        //brokerURL服务器的ip以及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        //2.使用ConnectionFactory创建
        Connection connection = connectionFactory.createConnection();

        //3.开启链接,调用connection对象的start的方法
        connection.start();

        //4.使用connection对创建一个session对象
        //[4.1] 第一参数:是否开启事务
        //[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式
        //1.自动应答2.手动应答 一般为自动
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象
        //参数:队列名称

        Queue queue = session.createQueue("test-queue2");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(
                new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            TextMessage textMessage = (TextMessage) message;
                            String text = null;
                            //取消的内容
                            text = textMessage.getText();
                            //第八步 打印消息
                            System.out.println(text);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
        );

        //等待键盘输入 阻塞
        System.in.read();

        //第九步 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

二.activmq的发布订阅模型

TopicProducer.class
package com.demo.dingyue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class TopicProducer {

    @Test
    public void testTopicProducer() throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("huaYuanBaoBao");

        MessageProducer producer = session.createProducer(topic);

        TextMessage textMessage = session.createTextMessage("这个是发布订阅的");

        producer.send(textMessage);

        producer.close();
        session.close();
        connection.close();
    }

}
TopicCustomer.class:
package com.demo.dingyue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;
import java.io.IOException;

public class TopicCustomer {

    @Test
    public void testTopicCustomer() throws JMSException, IOException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("huaYuanBaoBao");

        MessageConsumer consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try{

               TextMessage textMessage = (TextMessage) message;
               String text = null;
               //取出消息的内容
             text= textMessage.getText();

                    System.out.println(text);

                }catch (Exception e){

                    e.printStackTrace();
                }
            }
        });

        System.out.println("消费端03");
        System.in.read();

        //关闭资源
        connection.close();
        consumer.close();
        session.close();

    }
}

和Spring整合:

spring-amq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:component-scan base-package="com.demo.spring"/>

    <bean id="amqSenderService" class="com.demo.spring.AMQSenderServiceImpl">
    <!--<bean id="user" class="com.demo.spring.User">-->
    </bean>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="trustAllPackages" value="true"/>
                <property name="brokerURL">
                    <value>tcp://192.168.1.20:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用缓存可以提升效率-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <!--测试Queue,队列的名字是spring-queue-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--<constructor-arg index="0" value="spring-queue"/>-->
        <constructor-arg name="name" value="spring-queue"/>
    </bean>

    <!--测试Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

</beans>
AMQSenderServiceImpl:
package com.demo.spring;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class AMQSenderServiceImpl  {

    private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    //目的地队列的明证,我们要向这个队列发送消息
    @Resource(name = "destinationQueue")
    private Destination destination;

    //向特定的队列发送消息
    public void sendMsg(final User user) {
//        final String msg = JSON.toJSONString(mqParamDto);
        user.setEmail("[email protected]");
        user.setPassword("123456");
        user.setPhone("123456");
        user.setSex("M");
        user.setUsername("javaceshi");

        try {
            logger.info("将要向队列{}发送的消息msg:{}", destination, user);
            jmsTemplate.send(destination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
//                    return session.createObjectMessage(user);
                    return  session.createTextMessage("2019/1/18message");
                }
            });

        } catch (Exception ex) {
            logger.error("向队列{}发送消息失败,消息为:{}", destination, user);
        }

    }
}
AMQReceiverServiceImpl:
package com.demo.spring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class AMQReceiverServiceImpl {
    private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    //目的地队列的明证,我们要向这个队列接收消息
    @Resource(name = "destinationQueue")
    private Destination destination;

    //向特定的队列接收消息
    public void receiverMsg(final User user) {
//

        try {
            Object object = jmsTemplate.receive(destination);
            User msg = (User) object;
            System.out.println(msg);

        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }
}

测试类:App

package com.demo.spring;

import com.demo.spring.User;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 主发送类
 *
 */
public class App
{
    public static void main( String[] args )
    {
        final  User user = new User();
        user.setEmail("[email protected]");
        user.setPassword("123456");
        user.setPhone("123456");
        user.setSex("M");
        user.setUsername("javaceshi");

        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-amq.xml");
        AMQSenderServiceImpl sendService = (AMQSenderServiceImpl)context.getBean("amqSenderService");
        sendService.sendMsg(user);
//        sendService.send(user);
        System.out.println("send successfully, please visit http://192.168.1.20:8161/admin to see it");
    }
}

原文地址:https://www.cnblogs.com/charlypage/p/10306801.html

时间: 2024-11-09 00:00:06

activitemq整合spring的相关文章

Mybatis整合Spring 【转】

根据官方的说法,在ibatis3,也就是Mybatis3问世之前,Spring3的开发工作就已经完成了,所以Spring3中还是没有对Mybatis3的支持.因此由Mybatis社区自己开发了一个Mybatis-Spring用来满足Mybatis用户整合Spring的需求.下面就将通过Mybatis-Spring来整合Mybatis跟Spring的用法做一个简单的介绍. MapperFactoryBean 首先,我们需要从Mybatis官网上下载Mybatis-Spring的jar包添加到我们项

Netty5快速入门及实例视频教程(整合Spring)

Netty5快速入门及实例视频教程+源码(整合Spring) https://pan.baidu.com/s/1pL8qF0J 01.传统的Socket分析02.NIO的代码分析03.对于NIO的一些疑惑04.Netty服务端HelloWorld入门05.Netty服务端入门补充06.Netty客户端入门07.如何构建一个多线程NIO系统08.Netty源码分析一09.Netty源码分析二10.Netty5服务端入门案例11.Netty5客户端入门案例12.单客户端多连接程序13.Netty学习

框架整合——Spring与MyBatis框架整合

Spring整合MyBatis 1. 整合 Spring [整合目标:在spring的配置文件中配置SqlSessionFactory以及让mybatis用上spring的声明式事务] 1). 加入 Spring 的 jar 包和配置文件 <1>.Spring框架需要的jar包: com.springsource.net.sf.cglib-2.2.0.jarcom.springsource.org.aopalliance-1.0.0.jarcom.springsource.org.aspect

Shiro整合Spring

首先需要添加shiro的spring整合包. 要想在WEB应用中整合Spring和Shiro的话,首先需要添加一个由spring代理的过滤器如下: <!-- The filter-name matches name of a 'shiroFilter' bean inside applicationContext.xml --> <filter> <filter-name>shiroFilter</filter-name> <filter-class&

整合 Spring + SpringMVC + MyBatis

< 一 > POM 配置文件 ( 如果出现 JAR 包 引入错误, 请自行下载 ) <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.

【Java EE 学习第81天】【CXF框架】【CXF整合Spring】

一.CXF简介 CXF是Apache公司下的项目,CXF=Celtix+Xfire:它支持soap1.1.soap1.2,而且能够和spring进行快速无缝整合. 另外jax-ws是Sun公司发布的一套开发WebService服务的标准.早期的标准如jax-rpc已经很少使用,而cxf就是在新标准jax-ws下开发出来的WebService,jax-ws也内置到了jdk1.6当中. CXF官方下载地址:http://cxf.apache.org/download.html 下载完成之后,解压开压

Echache整合Spring缓存实例讲解

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文主要介绍了EhCache,并通过整合Spring给出了一个使用实例. 一.EhCache 介绍 EhCache 是一个纯Java的进程内缓存框架,具有快速.精干等特点,是Hibernate中默认的CacheProvider.Ehcache是一种广泛使用的开源Java分布式缓存.主要面向通用缓存,Java EE和轻量级容器.它具有内存和磁盘存储,缓存加载器,缓存扩展,缓存异常处理程序

整合spring,springmvc和mybatis

我创建的是maven项目,使用到的依赖架包有下面这些: <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.1.2.RELEASE</version> </dependency> <dependency> <

整合Spring.net到asp.net网站开发中初探

整合Spring.net到asp.net网站开发中初探 http://www.veryhuo.com 2009-10-21 烈火网 投递稿件 我有话说 Spring提供了一个轻量级的用于构建企业级的应用程序的解决方案.Spring提供一致并清晰的配置并整合AOP(Aspect-Oriented Programming)至你的软件中.Spring.net最耀眼的功能是在中间层提供声明式事务管理用于构建全功能的ASP.NET框架. Spring.net是一个提供综合的基础结构用于支持企业级.Net开