消息队列——ActiveMQ

1.   学习计划

1、什么是MQ

2、MQ的应用场景

3、ActiveMQ的使用方法。

4、使用消息队列实现商品同步。

2.   同步索引库分析

方案一:在manager(后台)中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑。

  缺点:这样违背了服务单一职能的原则,业务逻辑耦合度高,业务拆分不明确。

方案二:业务逻辑在search中实现,调用服务在manager实现。业务逻辑分开。

  缺点:服务之间的耦合度变高,search服务依赖manager服务,服务的启动有先后顺序。

方案三:使用消息队列。MQ是一个消息中间件。

MQ是一个消息中间件,ActiveMQ、RabbitMQ、kafka(大数据)。

系统服务之间,非直接通信,而是通过MQ进行转发,这样既解决了系统之间的通信问题,同时也避免了系统之间的依赖和耦合。

消息队列的主要应用:

  解决系统之间的通信问题

  降低系统之间的耦合度

互联网项目中,为了用户的体验,必须遵守快速响应用户的原则,比如在电商项目中,当用户下单之后,其实还有很多的后续业务需要完成,用户根本不可能等你的流程全部处理完才得到下单反馈。这是后我们可以利用消息队列,在既能够快速响应用户的同时,有能后将业务消息压入MQ中,通过MQ的流转,通知后续业务的开展,达到数据最终一致

3.   ActiveMQ

3.1. 什么是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  6. 支持通过JDBC和journal提供高速的消息持久化

  7. 从设计上保证了高性能的集群,客户端-服务器,点对点

  8. 支持Ajax

  9. 支持与Axis的整合

  10. 可以很容易得调用内嵌JMS provider,进行测试

3.2. ActiveMQ的消息形式

对于消息的传递有两种类型

一种是点对点的,即一个生产者和一个消费者一一对应;(只能有一个消费者)

另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。(广播)

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  · StreamMessage -- Java原始值的数据流

  · MapMessage--一套名称-值对

  · TextMessage--一个字符串对象

  · ObjectMessage--一个序列化的 Java对象

  · BytesMessage--一个字节的数据流

4.   ActiveMQ的安装

进入http://activemq.apache.org/下载ActiveMQ

使用的版本是5.12.0

4.1. 安装环境:

1、需要jdk

2、安装Linux系统。生产环境都是Linux系统。

4.2. 安装步骤

第一步: 把ActiveMQ 的压缩包上传到Linux系统。

第二步:解压缩。

第三步:启动。

4.3.启动,查看,关闭

使用bin目录下的activemq命令启动:

[[email protected] bin]# ./activemq start

关闭:

[[email protected] bin]# ./activemq stop

查看状态:

[[email protected] bin]# ./activemq status

4.4 细节

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2

进入管理后台:

http://192.168.25.168:8161/admin

用户名:admin

密码:admin

  

   

503错误解决:

1、查看机器名

[[email protected] bin]# cat /etc/sysconfig/network

NETWORKING=yes

HOSTNAME=itcast168

2、修改host文件

[[email protected] bin]# cat /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 itcast168

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

[[email protected] bin]#

3、重启Activemq服务

5.   ActiveMQ的使用方法

下面是java代码使用MQ

5.1. Queue

  • Queue消息形式,服务端默认进行持久化。
  • Queue消息形式,只要被任意一个consumer消费后,服务端消除该消息,即一个消息只能被一个consumer消费。

如果消息没有被消费,则会一直被保存在服务端,直到被消费为止。

5.1.1.    Producer

生产者:生产消息,发送端。

把jar包添加到工程中。使用5.11.2版本的jar包。

5.1.1.1    创建步骤

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

5.1.1.2  代码示例

@Test
    public void testQueueProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        //brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
        //参数:队列的名称。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

5.1.2.    Consumer

消费者:接收消息。

5.1.2.1    创建步骤

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源

5.1.2.2    代码示例

@Test
    public void testQueueConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

5.2. Topic

  • Topic消息形式,服务端默认不进行持久化存储。
  • Topic消息可以被多个consumer接收到,不会因为某一个consumer的消费行为,而是的消息被服务端删除。

5.2.1.    Producer

5.2.1.1.    创建步骤

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

5.2.1.2.  示例代码

@Test
    public void testTopicProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        // brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
        // 参数:话题的名称。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

5.2.2.    Consumer

消费者:接收消息。

5.2.2.1.    创建步骤

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。需要利用MessageListener。

第八步:打印消息。

第九步:关闭资源

5.2.2.2.    示例代码

@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消费端03。。。。。");
        // 等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

6.   Activemq整合spring

使用spring提整合ActiveMQ,可以避免如上繁琐的使用步骤。

6.1. 配置producer

首先是spring和ActiveMQ整合中,如何配置producer

6.1.1.   导入整合包

第一步:引用spring和ActiveMQ整合的相关jar包。

<dependency>

              <groupId>org.springframework</groupId>

              <artifactId>spring-jms</artifactId>

         </dependency>

         <dependency>

              <groupId>org.springframework</groupId>

              <artifactId>spring-context-support</artifactId>

</dependency>

6.1.2.   配置ConnectionFactory

第二步:配置Activemq整合spring。配置ConnectionFactory

spring对ConnectionFactory进行了更上一层的包装(接口),真正的connectionFactory由JMS服务厂商提供,在这里是ActiveMQConnectionFactory,需要注入到spring中的connectionFactory。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
</beans>

6.1.2.   配置生产者

第三步:配置生产者:spring中提供了一个模板,JMSTemplate用来简化如上展示的发送消息的步骤。

使用JMSTemplate对象。发送消息,template中需要注入connectionFactory。

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

6.1.3.   配置Destination

第四步:在spring容器中配置Destination。可以是Queue,也可以是Topic

    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>

6.1.4.   完整的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
</beans>

下面看一下spring和ActiveMQ整合中,如何配置cousumer

6.2. 配置consumer

  • 导入jar包
  • 配置connectionFactory
  • 配置destination

以上同producer

6.2.1.   创建MessageListener

消息的接受需要用到,MessageListener,listener一旦监听到有消息传来,便会执行。

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {

        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

6.2.2   配置消费者

与是手动创建consumer接受消息的代码相比,在spring中,我们只需要配置消息监听器消息监听器容器,然后启动spring容器就可以了。

<!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>

6.2.3.   完整的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

6.3. 代码测试

6.3.1.    发送消息

第一步:初始化一个spring容器

第二步:从容器中获得JMSTemplate对象。

第三步:从容器中获得一个Destination对象

第四步:使用JMSTemplate对象发送消息,需要知道Destination

@Test

     public void testQueueProducer() throws Exception {

         // 第一步:初始化一个spring容器

         ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");

         // 第二步:从容器中获得JMSTemplate对象。

         JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);

         // 第三步:从容器中获得一个Destination对象

         Queue queue = (Queue) applicationContext.getBean("queueDestination");

         // 第四步:使用JMSTemplate对象发送消息,需要知道Destination

         jmsTemplate.send(queue, new MessageCreator() {

              @Override

              public Message createMessage(Session session) throws JMSException {

                   TextMessage textMessage = session.createTextMessage("spring activemq test");

                   return textMessage;

              }

         });

     }

6.3.2.    接收消息

Taotao-search-Service中接收消息。

第一步:把Activemq相关的jar包添加到工程中

第二步:创建一个MessageListener的实现类

第三步:配置spring和Activemq整合。

第四步:测试代码。

@Test

     public void testQueueConsumer() throws Exception {

         //初始化spring容器

         ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");

         //等待

         System.in.read();

     }

7.   添加商品同步索引库

7.1. Producer

manager-service工程中发送消息。

当商品添加完成后发送一个TextMessage,包含一个商品id。search-service从消息队列MQ中接受到推送过来的id值,根据id值去查询数据库,拿到对应的数据;利用查询得到的数据建立索引,添加到索引库中。

@Override
    public TaotaoResult addItem(TbItem item, String desc) {
        // 1、生成商品id
        final long itemId = IDUtils.genItemId();
        // 2、补全TbItem对象的属性
        item.setId(itemId);
        //商品状态,1-正常,2-下架,3-删除
        item.setStatus((byte) 1);
        Date date = new Date();
        item.setCreated(date);
        item.setUpdated(date);
        // 3、向商品表插入数据
        itemMapper.insert(item);
        // 4、创建一个TbItemDesc对象
        TbItemDesc itemDesc = new TbItemDesc();
        // 5、补全TbItemDesc的属性
        itemDesc.setItemId(itemId);
        itemDesc.setItemDesc(desc);
        itemDesc.setCreated(date);
        itemDesc.setUpdated(date);
        // 6、向商品描述表插入数据
        itemDescMapper.insert(itemDesc);
        //发送一个商品添加消息
        jmsTemplate.send(topicDestination, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(itemId + "");
                return textMessage;
            }
        });
        // 7、TaotaoResult.ok()
        return TaotaoResult.ok();
    }

7.2. Consumer

7.2.1.    功能分析

1、接收消息。需要创建MessageListener接口的实现类。

2、取消息,取商品id。

3、根据商品id查询数据库。

4、创建一SolrInputDocument对象。

5、使用SolrServer对象写入索引库。

6、返回成功,返回TaotaoResult。

7.2.2.    Dao层

根据商品id查询商品信息。

映射文件:

<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
        SELECT
            a.id,
            a.title,
            a.sell_point,
            a.price,
            a.image,
            b. NAME category_name,
            c.item_desc
        FROM
            tb_item a
        JOIN tb_item_cat b ON a.cid = b.id
        JOIN tb_item_desc c ON a.id = c.item_id
        WHERE a.status = 1
          AND a.id=#{itemId}
    </select>

7.2.3.    Service层

参数:商品ID

业务逻辑:

1、根据商品id查询商品信息。

2、创建一SolrInputDocument对象。

3、使用SolrServer对象写入索引库。

4、返回成功,返回TaotaoResult。

返回值:TaotaoResult

public TaotaoResult addDocument(long itemId) throws Exception {
        // 1、根据商品id查询商品信息。
        SearchItem searchItem = searchItemMapper.getItemById(itemId);
        // 2、创建一SolrInputDocument对象。
        SolrInputDocument document = new SolrInputDocument();
        // 3、使用SolrServer对象写入索引库。
        document.addField("id", searchItem.getId());
        document.addField("item_title", searchItem.getTitle());
        document.addField("item_sell_point", searchItem.getSell_point());
        document.addField("item_price", searchItem.getPrice());
        document.addField("item_image", searchItem.getImage());
        document.addField("item_category_name", searchItem.getCategory_name());
        document.addField("item_desc", searchItem.getItem_desc());
        // 5、向索引库中添加文档。
        solrServer.add(document);
        solrServer.commit();
        // 4、返回成功,返回TaotaoResult。
        return TaotaoResult.ok();
    }

7.2.4.    Listener

public class ItemChangeListener implements MessageListener {

    @Autowired
    private SearchItemServiceImpl searchItemServiceImpl;

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = null;
            Long itemId = null;
            //取商品id
            if (message instanceof TextMessage) {
                textMessage = (TextMessage) message;
                itemId = Long.parseLong(textMessage.getText());
            }
            //向索引库添加文档
            searchItemServiceImpl.addDocument(itemId);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

7.2.5.    Spring配置监听

原文地址:https://www.cnblogs.com/arjenlee/p/9224615.html

时间: 2024-11-03 12:15:37

消息队列——ActiveMQ的相关文章

lesson5:利用jmeter来压测消息队列(activemq)

本文讲述了利用jmeter来压测消息队列,其中消息队列采用apache的activemq,jmeter本身是支持符合jms标准消息队列的压测,由于jmeter的官方sampler配置比较复杂,本文直接使用sdk的方式来压测,与生产实际使用更加接近,各位如对官方的sampler感兴趣,可以自行去配置完成. 准备工作:下载activemq 地址:http://activemq.apache.org 本文中的activemq采用的是5.9.0版本. jMetterLessons工程源码地址:https

第十一章 企业项目开发--消息队列activemq

注意:本章代码基于 第十章 企业项目开发--分布式缓存Redis(2) 代码的github地址:https://github.com/zhaojigang/ssmm0 消息队列是分布式系统中实现RPC的一种手段. 1.消息队列的基本使用流程 假设: 我们有这样一个需求,当每注册一个admin的之后,就写一条日志log数据到数据库. 分析: 在实际中,我们是不会把日志直接写入数据库的,因为日志数据通常是庞大的,而且日志的产生是频繁的,如果我们使用数据库存储日志,哪怕是使用异步存储,也是极耗性能的.

JMS消息队列ActiveMQ(点对点模式)

生产者(producer)->消息队列(message queue) package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session

深入浅出 消息队列 ActiveMQ(转)

一. 概述与介绍 ActiveMQ 是Apache出品,最流行的.功能强大的即时通讯和集成模式的开源服务器.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现.提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能. 二. 特性 1. 多种语言和协议编写客户端.语言: Java.C.C++.C#.Ruby.Perl.Python.PHP.应用协议:OpenWire.Stomp REST.WS N

JMS消息队列ActiveMQ(发布/订阅模式)

消费者1(Consumer)--订阅(subcribe)-->主题(Topic) package com.java1234.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.

消息队列ActiveMQ

一:为什么要使用消息队列呢? 在开发上一个APP后台时候,其中很重要的一块就是消息,通讯模块,使用的是开源的Openfire. 架构: 两台API服务器 两台Openfire服务器 若干数据库服务器集群 其中业务的很大一部分都需要发送消息,用户下了订单,用户取消订单,等等都需要服务器给用户来发送消息.使用的解决方式就是在Openfire的基础上规定了自己的消息格式.用户去操作,然后API服务器通知Openfire服务器去发送消息.Openfire服务器去接受来自API服务器的请求,然后去给用户发

Java消息队列--ActiveMq 实战

原文地址:http://www.cnblogs.com/jaycekon/p/6225058.html 1.下载安装ActiveMQ ActiveMQ官网下载地址:http://activemq.apache.org/download.html ActiveMQ 提供了Windows 和Linux.Unix 等几个版本,楼主这里选择了Linux 版本下进行开发. 下载完安装包,解压之后的目录: 从它的目录来说,还是很简单的: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是

深入浅出 消息队列 ActiveMQ

一. 概述与介绍 ActiveMQ 是Apache出品,最流行的.功能强大的即时通讯和集成模式的开源服务器.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现.提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能. 二. 特性 1. 多种语言和协议编写客户端.语言: Java.C.C++.C#.Ruby.Perl.Python.PHP.应用协议:OpenWire.Stomp REST.WS N

activemq学习总结 (转)Java消息队列--ActiveMq 实战

转:https://www.cnblogs.com/jaycekon/p/6225058.html 感谢作者 ActiveMQ官网下载地址:http://activemq.apache.org/download.html ActiveMQ 提供了Windows 和Linux.Unix 等几个版本,楼主这里选择了Linux 版本下进行开发. 下载完安装包,解压之后的目录: 从它的目录来说,还是很简单的: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志文件 docs存放的