实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)

引言:

最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情。最后研究说用消息队列,当有需要发送邮件告警的时候,就向队列中添加一个标识消息,ActiveMQ通过监听器的形式,实时监听队列里边的小时,收到消息后,判断是不是需要发送告警的标识,是的话就自行就行发送邮件!这是就研究的消息队列ActiveMQ,下边就是具体内容:

一、ActiveMQ

1.1). ActiveMQ

?ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

1. 2). Java Message Service(JMS)

JMS支持两种消息发送和接收模型。

  • 一种称为P2P(Ponit to Point)模型(点对点一对一),即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。

  • 另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。

1.3). JMS术语
  1. Provider/MessageProvider:生产者
  2. Consumer/MessageConsumer:消费者
  3. PTP:Point To Point,点对点通信消息模型
  4. Pub/Sub:Publish/Subscribe,发布订阅消息模型
  5. Queue:队列,目标类型之一,和PTP结合
  6. Topic:主题,目标类型之一,和Pub/Sub结合
  7. ConnectionFactory:连接工厂,JMS用它创建连接
  8. Connnection:JMS Client到JMS Provider的连接
  9. Destination:消息目的地,由Session创建
  10. Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是 Session创建的
1.4).?ActiveMQ应用场景

类似送快递,快递员(producer)将快递(Message)放到指定地点(destination)后,就可以离开了,拿快递的人(customer)在接收到通知后,到指定地点(destination)去取快递(Message)就可以了。当然,取快递时可能要进行身份验证,这就涉及到创建连接(connection)时,需要指定用户名和密码了。还有就是,实际生活中,当快递员把快递放好之后,照理说应该通知客户去哪里取快递,而ActiveMq帮我们做好了一切,通知的工作Activemq会帮我们实现,而无需我们亲自编码通知消费者,生产者只需要将Message放到Mq中即可,通知消费者的工作,mq会帮我们处理

用途就是用来处理消息,也就是处理JMS在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。

在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧,但使用队列后,用户的请求发给队列后立即返回。

1.5).?ActiveMQ下载

1.6). 启动

/apache-activemq-5.15.3/bin/win64/目录下双击activemq.bat文件,在浏览器中输入http://localhost:8161/admin/, 用户名和密码输入admin即可

二、Srping+ActiveMQ应用实例

2,1). 项目结构

2,2). 导入maven依赖,pom.xml文件
  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4   <modelVersion>4.0.0</modelVersion>
  5
  6   <groupId>www.cnblogs.com.hongmoshu</groupId>
  7   <artifactId>test_actmq</artifactId>
  8   <version>0.0.1-SNAPSHOT</version>
  9   <packaging>war</packaging>
 10   <name>test_actmq Maven Webapp</name>
 11   <url>http://www.example.com</url>
 12
 13    <!-- 版本管理 -->
 14   <properties>
 15     <springframework>4.1.8.RELEASE</springframework>
 16   </properties>
 17
 18
 19    <dependencies>
 20
 21      <!-- junit单元测试 -->
 22     <dependency>
 23       <groupId>junit</groupId>
 24       <artifactId>junit</artifactId>
 25       <version>4.11</version>
 26       <scope>test</scope>
 27     </dependency>
 28
 29     <!-- JSP相关 -->
 30   <dependency>
 31     <groupId>jstl</groupId>
 32     <artifactId>jstl</artifactId>
 33     <version>1.2</version>
 34   </dependency>
 35   <dependency>
 36     <groupId>javax.servlet</groupId>
 37     <artifactId>servlet-api</artifactId>
 38     <scope>provided</scope>
 39     <version>2.5</version>
 40   </dependency>
 41
 42      <!-- spring -->
 43     <dependency>
 44       <groupId>org.springframework</groupId>
 45       <artifactId>spring-core</artifactId>
 46       <version>${springframework}</version>
 47     </dependency>
 48     <dependency>
 49       <groupId>org.springframework</groupId>
 50       <artifactId>spring-context</artifactId>
 51       <version>${springframework}</version>
 52     </dependency>
 53     <dependency>
 54       <groupId>org.springframework</groupId>
 55       <artifactId>spring-tx</artifactId>
 56       <version>${springframework}</version>
 57     </dependency>
 58     <dependency>
 59       <groupId>org.springframework</groupId>
 60       <artifactId>spring-webmvc</artifactId>
 61       <version>${springframework}</version>
 62     </dependency>
 63     <dependency>
 64       <groupId>org.springframework</groupId>
 65       <artifactId>spring-jms</artifactId>
 66       <version>${springframework}</version>
 67     </dependency>
 68
 69     <!-- xbean 如<amq:connectionFactory /> -->
 70     <dependency>
 71       <groupId>org.apache.xbean</groupId>
 72       <artifactId>xbean-spring</artifactId>
 73       <version>3.16</version>
 74     </dependency>
 75
 76     <!-- activemq -->
 77     <dependency>
 78       <groupId>org.apache.activemq</groupId>
 79       <artifactId>activemq-core</artifactId>
 80       <version>5.7.0</version>
 81     </dependency>
 82     <dependency>
 83       <groupId>org.apache.activemq</groupId>
 84       <artifactId>activemq-pool</artifactId>
 85       <version>5.12.1</version>
 86     </dependency>
 87
 88     <!-- gson -->
 89     <dependency>
 90       <groupId>com.google.code.gson</groupId>
 91       <artifactId>gson</artifactId>
 92       <version>1.7.1</version>
 93     </dependency>
 94
 95       <!-- JSON -->
 96     <dependency>
 97         <groupId>net.sf.json-lib</groupId>
 98         <artifactId>json-lib</artifactId>
 99         <version>2.4</version>
100         <classifier>jdk15</classifier>
101     </dependency>
102
103   </dependencies>
104
105   <build>
106     <finalName>test_actmq</finalName>
107
108   </build>
109 </project>
2,3). ActiveMQ的配置文件ActiveMQ.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:amq="http://activemq.apache.org/schema/core"
 5        xmlns:jms="http://www.springframework.org/schema/jms"
 6        xmlns:context="http://www.springframework.org/schema/context"
 7        xmlns:mvc="http://www.springframework.org/schema/mvc"
 8        xsi:schemaLocation="
 9         http://www.springframework.org/schema/beans
10         http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
11         http://www.springframework.org/schema/context
12         http://www.springframework.org/schema/context/spring-context-4.1.xsd
13         http://www.springframework.org/schema/mvc
14         http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
15         http://www.springframework.org/schema/jms
16         http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
17         http://activemq.apache.org/schema/core
18         http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
19 >
20
21     <context:component-scan base-package="com.svse.service" />
22     <mvc:annotation-driven />
23
24     <!-- jms.useAsyncSend=true 允许异步接收消息 -->
25     <amq:connectionFactory id="amqConnectionFactory"
26                            brokerURL="tcp://192.168.6.111:61616?jms.useAsyncSend=true"
27                            userName="admin"
28                            password="admin" />
29
30     <!-- 配置JMS连接工 厂 -->
31     <bean id="connectionFactory"
32           class="org.springframework.jms.connection.CachingConnectionFactory">
33         <constructor-arg ref="amqConnectionFactory" />
34         <property name="sessionCacheSize" value="100" />
35     </bean>
36
37     <!-- 定义消息队列名称(Queue) -->
38     <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
39         <!-- 设置消息队列的名字 -->
40         <constructor-arg>
41             <value>Jaycekon</value>
42         </constructor-arg>
43     </bean>
44
45     <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
46     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
47         <property name="connectionFactory" ref="connectionFactory" />
48         <property name="defaultDestination" ref="demoQueueDestination" />
49         <property name="receiveTimeout" value="10000" />
50         <!-- true是topic,false是queue,默认是false,此处显示写出false -->
51         <property name="pubSubDomain" value="false" />
52         <!-- 消息转换器 -->
53         <property name="messageConverter" ref="userMessageConverter"/>
54     </bean>
55
56      <!-- 类型转换器 -->
57     <bean id="userMessageConverter" class="com.svse.util.ObjectMessageConverter"/>
58
59
60     <!-- 配置消息队列监听者(Queue) -->
61      <bean id="queueMessageListener" class="com.svse.util.QueueMessageListener" />
62
63     <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
64     <bean id="queueListenerContainer"
65           class="org.springframework.jms.listener.DefaultMessageListenerContainer">
66         <property name="connectionFactory" ref="connectionFactory" />
67         <property name="destination" ref="demoQueueDestination" />
68         <property name="messageListener" ref="queueMessageListener" />
69     </bean>
70
71 </beans>
2,4). Spring的配置文件 spring-mvc.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:context="http://www.springframework.org/schema/context"
 5     xmlns:mvc="http://www.springframework.org/schema/mvc"
 6     xsi:schemaLocation="http://www.springframework.org/schema/beans
 7         http://www.springframework.org/schema/beans/spring-beans.xsd
 8         http://www.springframework.org/schema/context
 9         http://www.springframework.org/schema/context/spring-context-4.1.xsd
10         http://www.springframework.org/schema/mvc
11         http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
12
13     <context:component-scan base-package="com.svse.controller" />
14     <mvc:annotation-driven />
15     <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver">
16         <property name="viewClass"
17             value="org.springframework.web.servlet.view.JstlView" />
18         <property name="prefix" value="/WEB-INF/views/" />
19         <property name="suffix" value=".jsp" />
20     </bean>
21
22 </beans>
2,5). web.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xmlns="http://xmlns.jcp.org/xml/ns/javaee"
 4          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
 5          id="WebApp_ID" version="3.1">
 6   <display-name>mydemo</display-name>
 7
 8   <welcome-file-list>
 9     <welcome-file>index.jsp</welcome-file>
10   </welcome-file-list>
11
12   <!-- 加载spring及active的配置文件,classpath为项目src下的路径 -->
13   <context-param>
14     <param-name>contextConfigLocation</param-name>
15     <param-value>
16           classpath:spring-mvc.xml;
17           classpath:ActiveMQ.xml;
18     </param-value>
19     </context-param>
20
21
22  <listener>
23     <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
24   </listener>
25
26   <servlet>
27     <servlet-name>springMVC</servlet-name>
28     <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
29     <init-param>
30       <param-name>contextConfigLocation</param-name>
31       <param-value>classpath:spring-mvc.xml</param-value>
32     </init-param>
33     <load-on-startup>1</load-on-startup>
34   </servlet>
35   <servlet-mapping>
36     <servlet-name>springMVC</servlet-name>
37     <url-pattern>/</url-pattern>
38   </servlet-mapping>
39
40   <!-- 处理编码格式 -->
41   <filter>
42     <filter-name>characterEncodingFilter</filter-name>
43     <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
44     <init-param>
45       <param-name>encoding</param-name>
46       <param-value>UTF-8</param-value>
47     </init-param>
48     <init-param>
49       <param-name>forceEncoding</param-name>
50       <param-value>true</param-value>
51     </init-param>
52   </filter>
53   <filter-mapping>
54     <filter-name>characterEncodingFilter</filter-name>
55     <url-pattern>/*</url-pattern>
56   </filter-mapping>
57
58 </web-app>
2,6). 实体类Users对象
 1 package com.svse.entity;
 2 import java.io.Serializable;
 3
 4 public class Users implements Serializable{
 5
 6     private String userId;
 7     private String userName;
 8     private String sex;
 9     private String age;
10     private String type;
11
12
13     public Users() {
14         super();
15     }
16     public Users(String userId, String userName, String sex, String age,
17             String type) {
18         super();
19         this.userId = userId;
20         this.userName = userName;
21         this.sex = sex;
22         this.age = age;
23         this.type = type;
24     }
25     public String getUserId() {
26         return userId;
27     }
28     public void setUserId(String userId) {
29         this.userId = userId;
30     }
31     public String getUserName() {
32         return userName;
33     }
34     public void setUserName(String userName) {
35         this.userName = userName;
36     }
37     public String getSex() {
38         return sex;
39     }
40     public void setSex(String sex) {
41         this.sex = sex;
42     }
43     public String getAge() {
44         return age;
45     }
46     public void setAge(String age) {
47         this.age = age;
48     }
49     public String getType() {
50         return type;
51     }
52     public void setType(String type) {
53         this.type = type;
54     }
55     @Override
56     public String toString() {
57         return "Users [userId=" + userId + ", userName=" + userName + ", sex="
58                 + sex + ", age=" + age + ", type=" + type + "]";
59     }
60
61
62 }
2,7). 核心代码(生产者ProducerService)
 1 package com.svse.service;
 2
 3 import javax.annotation.Resource;
 4 import javax.jms.Destination;
 5 import javax.jms.JMSException;
 6 import javax.jms.Message;
 7 import javax.jms.Session;
 8
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12
13 import com.svse.entity.Users;
14
15 @Service
16 public class ProducerService {
17
18     @Resource(name="jmsTemplate")
19     private JmsTemplate jmsTemplate;
20
21
22     /**
23      * 向指定队列发送消息 (发送文本消息)
24      */
25     public void sendMessage(Destination destination,final String msg){
26
27         jmsTemplate.setDeliveryPersistent(true);
28
29         System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
30         jmsTemplate.send(destination, new MessageCreator() {
31             public Message createMessage(Session session) throws JMSException {
32                 return session.createTextMessage(msg);
33             }
34         });
35     }
36
37     /**
38      * 向指定队列发送消息以对象的方式 (发送对象消息)
39      */
40     public void sendMessageNew(Destination destination,Users user){
41         System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+user);
42         jmsTemplate.convertAndSend(user);
43     }
44
45     /**
46      * 向默认队列发送消息
47      */
48     public void sendMessage(final String msg){
49         String destination = jmsTemplate.getDefaultDestinationName();
50         System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
51         jmsTemplate.send(new MessageCreator() {
52             public Message createMessage(Session session) throws JMSException {
53                 return session.createTextMessage(msg);
54             }
55         });
56     }
57 }
2,8). 核心代码(消费产者ConsumerService)
 1 package com.svse.service;
 2
 3 import javax.annotation.Resource;
 4 import javax.jms.Destination;
 5 import javax.jms.JMSException;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.TextMessage;
 8
 9 import net.sf.json.JSONObject;
10
11 import org.springframework.jms.core.JmsTemplate;
12 import org.springframework.stereotype.Service;
13
14 import com.svse.entity.Users;
15
16 @Service
17 public class ConsumerService {
18
19      @Resource(name="jmsTemplate")
20      private JmsTemplate jmsTemplate;
21      //接收文本消息
22      public TextMessage receive(Destination destination){
23             TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
24             try{
25                 JSONObject json=JSONObject.fromObject(textMessage.getText());
26                 System.out.println("name:"+json.getString("userName"));
27                 System.out.println("从队列" + destination.toString() + "收到了消息:\t"
28                         + textMessage.getText());
29             } catch (JMSException e) {
30                 e.printStackTrace();
31             }
32             return textMessage;
33         }
34      //接收对象消息
35      public ObjectMessage receiveNew(Destination destination){
36              ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination);
38              try{
39                 Users users= (Users) objMsg.getObject();
44                 System.out.println("name:"+users.getUserName());
47                 System.out.println("从队列" + destination.toString() + "收到了消息:\t"
48                         + users);
49             } catch (JMSException e) {
50                 e.printStackTrace();
51             }
52             return objMsg;
53         }
54 }
2,9). 核心代码(控制器ConsumerService)
  1 package com.svse.controller.mq;
  2
  3 import java.io.IOException;
  4 import java.text.SimpleDateFormat;
  5 import java.util.Date;
  7 import javax.annotation.Resource;
  8 import javax.jms.DeliveryMode;
  9 import javax.jms.Destination;
 10 import javax.jms.JMSException;
 11 import javax.jms.ObjectMessage;
 12 import javax.jms.TextMessage;
 13 import javax.management.MBeanServerConnection;
 14 import javax.management.remote.JMXConnector;
 15 import javax.management.remote.JMXConnectorFactory;
 16 import javax.management.remote.JMXServiceURL;
 18 import org.springframework.stereotype.Controller;
 19 import org.springframework.web.bind.annotation.RequestMapping;
 20 import org.springframework.web.bind.annotation.RequestMethod;
 21 import org.springframework.web.bind.annotation.RequestParam;
 22 import org.springframework.web.servlet.ModelAndView;
 24 import com.google.gson.Gson;
 25 import com.svse.entity.Users;
 26 import com.svse.service.ConsumerService;
 27 import com.svse.service.ProducerService;
 28
 29 @Controller
 30 public class DemoController {
 35
 36      //队列名Jaycekon (ActiveMQ中设置的队列的名称)
 37     @Resource(name="demoQueueDestination")
 38     private Destination demoQueueDestination;
 39
 40     //队列消息生产者
 41     @Resource(name="producerService")
 42     private ProducerService producer;
 43
 44    //队列消息消费者
 45     @Resource(name="consumerService")
 46     private ConsumerService consumer;
 47
 48     /*
 49      * 准备发消息
 50      */
 51     @RequestMapping(value="/producer",method=RequestMethod.GET)
 52     public ModelAndView producer(){
 53         System.out.println("------------go producer");
 54
 55         Date now = new Date();
 56         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 57         String time = dateFormat.format( now );
 58         System.out.println(time);
 59
 60         ModelAndView mv = new ModelAndView();
 61         mv.addObject("time", time);
 62         mv.setViewName("producer");
 63         return mv;
 64     }
 65
 66     /*
 67      * 发消息
 68      */
 69     @RequestMapping(value="/onsend",method=RequestMethod.POST)
 70     public ModelAndView producer(@RequestParam("message") String message) {
 71         System.out.println("------------send to jms");
 72         ModelAndView mv = new ModelAndView();
 73         for(int i=0;i<5;i++){
 74             try {
 75                 Users users=new Users("10"+(i+1),"赵媛媛"+(i+1),"女","27","影视演员");
 76                 Gson gson=new Gson();
 77                 String sendMessage=gson.toJson(users);
 78                 System.out.println("发送的消息sendMessage:"+sendMessage.toString());
 79              // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文本的形式
 80               producer.sendMessageNew(demoQueueDestination, users);//以对象的方式
 81
 82             } catch (Exception e) {
 83                 e.printStackTrace();
 84             }
 85         }
 86         mv.setViewName("index");
 87         return mv;
 88     }
 89     /*
 90      * 手动接收消息
 91      */
 92     @RequestMapping(value="/receive",method=RequestMethod.GET)
 93     public ModelAndView queue_receive() throws JMSException {
 94         System.out.println("------------receive message");
 95         ModelAndView mv = new ModelAndView();
 96
 97       //TextMessage tm = consumer.receive(demoQueueDestination);//接收文本消息
 98
 99         ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收对象消息
100         Users users= (Users) objMsg.getObject();
101         //mv.addObject("textMessage", tm.getText());
102         mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName());
103         mv.setViewName("receive");
104         return mv;
105     }
106
107     /*
108      * ActiveMQ Manager Test
109      */
110     @RequestMapping(value="/jms",method=RequestMethod.GET)
111     public ModelAndView jmsManager() throws IOException {
112         System.out.println("------------jms manager");
113         ModelAndView mv = new ModelAndView();
114         mv.setViewName("index");
115
116         JMXServiceURL url = new JMXServiceURL("");
117         JMXConnector connector = JMXConnectorFactory.connect(url);
118         connector.connect();
119         MBeanServerConnection connection = connector.getMBeanServerConnection();
120
121         return mv;
122     }
123
124 }

三、.对象转换器MessageConverter和消息监听器MessageListener

在上边的ProducerService和ConsumerService中,不论是发送消息还是接收消息,都可以以文本TextMessage的方式和ObjectMessage的方式.如果是简单的文本消息可以以TextMessage,但是如果需要发送的内容比较多,结构比较复杂,这时候就建议用对象文本ObjectMessage的方式向队列queue中发送消息了.但是这时候就需要用到对象消息转换器MessageConverter.

3,1). 消息转换器MessageageConverte

MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象
转换成对应的目标对象,这主要是用在接收消息的时候。

 1 package com.svse.util;
 2
 3 import java.io.Serializable;
 4
 5 import javax.jms.JMSException;
 6 import javax.jms.Message;
 7 import javax.jms.ObjectMessage;
 8 import javax.jms.Session;
 9
10 import org.springframework.jms.support.converter.MessageConversionException;
11 import org.springframework.jms.support.converter.MessageConverter;
12
13 /**
14  *功能说明:通用的消息对象转换类
15  *@author:zsq
16  *create date:2019年7月12日 上午9:28:31
17  *修改人   修改时间  修改描述
18  *Copyright (c)2019北京智华天成科技有限公司-版权所有
19  */
20 public class ObjectMessageConverter implements MessageConverter {
21
22
23     //把一个Java对象转换成对应的JMS Message (生产消息的时候)
24     public Message toMessage(Object object, Session session)
25             throws JMSException, MessageConversionException {
26
27         return session.createObjectMessage((Serializable) object);
28     }
29
30     //把一个JMS Message转换成对应的Java对象 (消费消息的时候)
31     public Object fromMessage(Message message) throws JMSException,
32             MessageConversionException {
33         ObjectMessage objMessage = (ObjectMessage) message;
34         return objMessage.getObject();
35     }
36
37 }

注意:写了消息转化器之后还需要的ActiveMQ.xml中进行配置

3,2). 消息监听器MessageageListe

MessageageListe作用就是动态的自行监听消息队列的生产者发送的消息,不需要人工手动接收!

 1 package com.svse.util;
 2 import javax.jms.JMSException;
 3 import javax.jms.Message;
 4 import javax.jms.MessageListener;
 5 import javax.jms.ObjectMessage;
 6 import javax.jms.TextMessage;
 7
 8 import com.svse.entity.Users;
 9
10
11 public class QueueMessageListener implements MessageListener {
12
13    //添加了监听器,只要生产者发布了消息,监听器监听到有消息消费者就会自动消费(获取消息)
14     public void onMessage(Message message) {
15          //(第1种方式)没加转换器之前接收到的是文本消息
16         //TextMessage tm = (TextMessage) message;
17
18         //(第2种方式)加了转换器之后接收到的ObjectMessage对象消息
19         ObjectMessage objMsg=(ObjectMessage) message;
20          Users users;
21         try {
22             users = (Users) objMsg.getObject();
23             //System.out.println("QueueMessageListener监听到了文本消息:\t" + tm.getText());
24             System.out.println("QueueMessageListener监听到了文本消息:\t" + users);
25             //do something ...
26         } catch (JMSException e1) {
27             // TODO Auto-generated catch block
28             e1.printStackTrace();
29         }
30     }
31
32 }

同样写好监听器后也是需在ActiveMQ.xml中进行配置注册的

总结

(1)注册JmsTemplate时,pubSubDomain这个属性的值要特别注意。默认值是false,也就是说默认只是支持queue模式,不支持topic模式。但是,如果将它改为true,则不支持queue模式。因此如果项目需要同时支持queue和topic模式,那么需要注册2个JmsTemplate,同时监听容器也需要注册2个

(2)使用Queue时,生产者只要将Message发送到MQ服务器端,消费者就可以进行消费,而无需生产者程序一直运行;

(3)消息是按照先入先出的顺序,一旦有消费者将Message消费,该Message就会从MQ服务器队列中删去;

(4)有文章说,“生产者”<-->"消费者"是一对一的关系,其实并不准确,从应用中可以看出,一个生产者产生的消息,可以被多个消费者进行消费,只不过多个消费者在消费消息时是竞争的关系,先得到的先消费,一旦消费完成,该消息就会出队列,
就不能被其他消费者再消费了,即“一次性消费”。就是我们熟悉的“点对点”通信了;

原文地址:https://blog.51cto.com/14226230/2420246

时间: 2024-11-05 17:31:04

实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)的相关文章

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

#queue队列 #生产者消费者模型

1 #queue队列 #生产者消费者模型 2 3 #queue队列 #有顺序的容器 4 #程序解耦 5 #提高运行效率 6 7 #class queue.Queue(maxsize=0) #先入先出 8 #class queue.LifoQueue(maxsize=0)最后在第一 9 #class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列#VIP客户 10 11 #Queue.qsize() 12 #Queue.empty() #return

ActiveMq C#客户端 消息队列的使用(存和取)

1.准备工具 VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip 2.开始项目 VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug

python2.0_s12_day9之day8遗留知识(queue队列&amp;生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ

5 并发编程--队列&amp;生产者消费者模型

1.队列的介绍 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: maxsize是队列中允许最大项数,省略则无大小限制. 但需要明确: 1.队列内存放的是消息而非大数据 2.队列占用的是内存空间,因而maxsize即便

并发无锁之环形队列生产者消费者问题

1.生产者消费者问题 三种关系:生产者--生产者(互斥):消费者-消费者(互斥):生产者--消费者(互斥同步) 两个角色:生产者:消费者 一种生产场所:缓冲区 2.环形队列(缓冲区) 数据结构:可以有多种,这里选用数组,逻辑上将a[0]和a[size-1]相连构成环形队列 判断空/判断满:当读指针和写指针指向同一个区域为空,或者满(但不能区分空或者满) 两种方案:1.即尾结点与首结点之间至少留有一个元素的空间. 2. 添加一个计数器(信号量就是计数器所以这里用信号量完成计数器的功能) 3.sem

10 阻塞队列 &amp; 生产者-消费者模式

原文:http://www.cnblogs.com/dolphin0520/p/3932906.html 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一

同步对象 队列 生产者消费者模式

多线程 同步对象 解决什么问题? 想要指定的一个线程先执行,再去执行其他线程 精华如下 #event = threading.Event() # event.isSet():返回event的状态值: # # event.wait():如果 event.isSet()==False将阻塞线程: # # event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度: # # event.clear():恢复event的状态值为False. impo

AMQP 与ActiveMQ,JMS消息队列之间的比较

http://blog.csdn.net/kimmking/article/details/8253549 http://www.csdn123.com/html/mycsdn20140110/8f/8f42bb0680685c547107a0079e557686.html http://blog.sina.com.cn/s/blog_999d1f4c01010dpx.html