Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

首先声明:以下内容均是在网上找别人的博客综合学习而成的,可能会发现某些代码与其他博主的相同,由于参考的文章比较多,这里对你们表示感谢,就不一一列举,如果有侵权的地方,请通知我,我可以把该文章删除。

1、jms-xml Spring配置文件

[html] view plain copy

print?

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans
  3. xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:p="http://www.springframework.org/schema/p"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
  7. <bean id = "connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  8. <property name = "brokerURL" value = "tcp://localhost:61616"/>
  9. </bean>
  10. <bean id = "topicDestination" class="org.apache.activemq.command.ActiveMQTopic"
  11. autowire="constructor">
  12. <constructor-arg value="com.spring.xkey.jms.topic"/>
  13. </bean>
  14. <bean id="sendMessage" class="com.spring.xkey.jms.SendMessage">
  15. <property name="username" value="xkey"/>
  16. <property name="password" value="1234567890"/>
  17. </bean>
  18. <bean id = "jmsMessageConverter" class="com.spring.xkey.jms.JmsMessageConverter">
  19. <property name="sendMessage" ref="sendMessage"/>
  20. </bean>
  21. <!-- 创建JMS发送信息的模板的对象 -->
  22. <bean id = "jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  23. <property name="connectionFactory" ref="connectionFactory"/>
  24. <!--property name="defaultDestination" ref="topicDestination"/-->
  25. <property name="receiveTimeout" value="6000"/>
  26. <property name="messageConverter" ref="jmsMessageConverter"/>
  27. </bean>
  28. <bean id = "jmsMessageListener" class="com.spring.xkey.jms.JmsMessageListener">
  29. </bean>
  30. <bean id = "publisher" class="com.spring.xkey.jms.Publisher">
  31. <property name="jmsTemplate" ref="jmsTemplate"/>
  32. <property name="destinations" ref="topicDestination" />
  33. <property name="sendMessage" ref="sendMessage"/>
  34. </bean>
  35. <bean id = "consumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  36. <property name="connectionFactory" ref="connectionFactory"/>
  37. <property name="destination" ref="topicDestination" />
  38. <property name="messageListener" ref="jmsMessageListener" />
  39. </bean>
  40. </beans>

2、Listener代码

[java] view plain copy

print?

  1. package com.spring.xkey.jms;
  2. import java.util.Date;
  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.MessageListener;
  6. import org.apache.activemq.command.ActiveMQMapMessage;
  7. public class JmsMessageListener implements MessageListener {
  8. public void onMessage(Message message) {
  9. ActiveMQMapMessage msg = null;
  10. //System.out.println("ONMessage-----------------" + message.toString());
  11. try {
  12. if (message instanceof ActiveMQMapMessage) {
  13. msg = (ActiveMQMapMessage) message;
  14. String username = msg.getString("username");
  15. String password = msg.getString("password");
  16. System.out.println("Message::: "+username+", "+password);
  17. //              msg = (ActiveMQMapMessage) message;
  18. //              String sentDate = msg.getString("date");
  19. //              String reMessage = msg.getString("message");
  20. //              int sentCount = msg.getInt("count");
  21. //              System.out
  22. //                      .println("-------------New Message Arrival-----------"
  23. //                              + new Date());
  24. //              System.out.println("It‘s " + sentCount + " time From Darcy: "
  25. //                      + reMessage + "   ---Send time :" + sentDate);
  26. }
  27. } catch (JMSException e) {
  28. System.out.println("JMSException in onMessage(): " + e.toString());
  29. } catch (Throwable t) {
  30. System.out.println("Exception in onMessage():" + t.getMessage());
  31. }
  32. }
  33. }

3、Converter代码

[java] view plain copy

print?

  1. package com.spring.xkey.jms;
  2. import javax.jms.JMSException;
  3. import javax.jms.MapMessage;
  4. import javax.jms.Message;
  5. import javax.jms.Session;
  6. import org.springframework.jms.support.converter.MessageConversionException;
  7. import org.springframework.jms.support.converter.MessageConverter;
  8. public class JmsMessageConverter implements MessageConverter{
  9. private SendMessage sendMessage;
  10. public void setSendMessage(SendMessage sendMsg){
  11. this.sendMessage = sendMsg;
  12. }
  13. public Object fromMessage(Message message) throws JMSException,
  14. MessageConversionException {
  15. // TODO Auto-generated method stub
  16. MapMessage  mapmessage= (MapMessage)message;
  17. this.sendMessage.setUsername(mapmessage.getString("username"));
  18. this.sendMessage.setPassword(mapmessage.getString("password"));
  19. System.out.println("First");
  20. return sendMessage;
  21. }
  22. public Message toMessage(Object arg0, Session session) throws JMSException,
  23. MessageConversionException {
  24. // TODO Auto-generated method stub
  25. this.sendMessage = (SendMessage)arg0;
  26. MapMessage  mapmessage= (MapMessage) session.createMapMessage();
  27. mapmessage.setString("username", this.sendMessage.getUsername());
  28. mapmessage.setString("password", this.sendMessage.getPassword());
  29. System.out.println("Second");
  30. return mapmessage;
  31. }
  32. }

4、Publisher代码

[cpp] view plain copy

print?

  1. package com.spring.xkey.jms;
  2. import java.util.Scanner;
  3. import javax.jms.Destination;
  4. import org.springframework.jms.core.JmsTemplate;
  5. public class Publisher {
  6. private JmsTemplate template;
  7. private Destination[] destinations;
  8. private SendMessage sendMessage;
  9. public void chart()
  10. {
  11. boolean chart = true;
  12. int count = 0;
  13. while(chart)
  14. {
  15. count ++;
  16. Scanner cin=new Scanner(System.in);
  17. System.out.println("输入聊天内容,输入N停止聊天");
  18. String text=cin.nextLine();
  19. if(text.equals("N"))
  20. {
  21. chart = false;
  22. }
  23. System.out.println("我:"+text);
  24. sendChartMessage(count,text);
  25. }
  26. }
  27. public void sendMsgCon(){
  28. Scanner cin=new Scanner(System.in);
  29. String username = cin.nextLine();
  30. String password = cin.nextLine();
  31. this.sendMessage.setUsername(username);
  32. this.sendMessage.setPassword(password);
  33. sendConvertor(this.sendMessage);
  34. }
  35. public void sendConvertor(SendMessage sendMsg){
  36. template.convertAndSend(destinations[0],sendMsg);
  37. }
  38. protected void sendChartMessage(int count , String strMessage)
  39. {
  40. MyMessageCreator creator = new MyMessageCreator(count,strMessage);
  41. template.send(destinations[0], creator);
  42. }
  43. public JmsTemplate getJmsTemplate() {
  44. return template;
  45. }
  46. public void setJmsTemplate(JmsTemplate template) {
  47. this.template = template;
  48. }
  49. public Destination[] getDestinations() {
  50. return destinations;
  51. }
  52. public void setDestinations(Destination[] destinations) {
  53. this.destinations = destinations;
  54. }
  55. public void setSendMessage(SendMessage sendMsg){
  56. this.sendMessage = sendMsg;
  57. }
  58. public SendMessage getSendMessage(){
  59. return this.sendMessage;
  60. }
  61. }

5、SendMessage代码

[java] view plain copy

print?

  1. package com.spring.xkey.jms;
  2. public class SendMessage {
  3. private String username;
  4. private String password;
  5. public void setUsername(String user){
  6. this.username = user;
  7. }
  8. public void setPassword(String pass){
  9. this.password = pass;
  10. }
  11. public String getUsername(){
  12. return this.username;
  13. }
  14. public String getPassword(){
  15. return this.password;
  16. }
  17. }

6、Test类

[java] view plain copy

print?

    1. package com.spring.xkey.jms;
    2. import javax.jms.JMSException;
    3. import org.springframework.context.ApplicationContext;
    4. import org.springframework.context.support.ClassPathXmlApplicationContext;
    5. import org.springframework.jms.listener.DefaultMessageListenerContainer;
    6. public class Test {
    7. /**
    8. * @param args
    9. */
    10. public static void main(String[] args) {
    11. // TODO Auto-generated method stub
    12. ApplicationContext context =
    13. new ClassPathXmlApplicationContext("jms.xml");
    14. /**Sender sender = (Sender)context.getBean("sender");
    15. sender.SendInfo();
    16. Receiver receiver = (Receiver)context.getBean("receiver");
    17. try {
    18. System.out.println(receiver.receiverInfo());
    19. } catch (JMSException e) {
    20. // TODO Auto-generated catch block
    21. e.printStackTrace();
    22. }*/
    23. Publisher pub = (Publisher)context.getBean("publisher");
    24. DefaultMessageListenerContainer consumer =
    25. (DefaultMessageListenerContainer)context.getBean("consumer");
    26. consumer.start();
    27. pub.sendMsgCon();
    28. //pub.chart();
    29. }
    30. }
时间: 2024-10-07 02:19:19

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)的相关文章

Spring JMS ActiveMQ整合(转)

转载自:http://my.oschina.net/xiaoxishan/blog/381209#comment-list ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中 记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试

简单聊聊消息队列的事务补偿机制

转自:https://my.oschina.net/u/1589819/blog/1503241 因为一直学习与尝试负责公司的推送相关业务,包括整个应用的实现,其中就采用了基于消息队列的异步事件驱动模型来做解耦异步处理,所以就要去做了解一些相关的知识点,这边稍作总结,并整理一下消息补偿机制的一套简单实现的代码设计图. 采用基于消息队列的异步事件驱动模型来解决问题的时候,一个计较棘手的问题就是事务的一致性. 案例:现在用户发起一个创建订单的请求,如果我们是单系统架构,那么修改订单表,修改库存表可能

计算机程序的思维逻辑 (61) - 内存映射文件及其应用 - 实现一个简单的消息队列

本节介绍内存映射文件,内存映射文件不是Java引入的概念,而是操作系统提供的一种功能,大部分操作系统都支持. 我们先来介绍内存映射文件的基本概念,它是什么,能解决什么问题,然后我们介绍如何在Java中使用,我们会设计和实现一个简单的.持久化的.跨程序的消息队列来演示内存映射文件的应用. 基本概念 所谓内存映射文件,就是将文件映射到内存,文件对应于内存中的一个字节数组,对文件的操作变为对这个字节数组的操作,而字节数组的操作直接映射到文件上.这种映射可以是映射文件全部区域,也可以是只映射一部分区域.

Lua 下实现一个简单的消息队列

Lua 下实现一个简单的消息队列,如下简单的几条代码就可以了. local q1 = {} local q2 = {} -- 产生消息只需要 table.insert(q1, msg) -- 分发消息需要两层循环, 可以处理 dispatch 过程中产生的新消息 while q1[1] do q1,q2 = q2,q1 for i=1,#q2 do dispatch(q2[i]) q2[i] = nil end end

Spring Boot:使用Rabbit MQ消息队列

综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以从消息队列中读走消息,而消息队列就是在消息的传输过程中保存消息的容器,你可以简单的把消息队列理解为类似快递柜,快递员(消息发布者)往快递柜(消息队列)投递物件(消息),接受者(消息订阅者)从快递柜(消息队列)接收物件(消息),当然消息队列往往还包含一些特定的消息传递和接收机制. 消息队列作为分布式系

MQ选型对比ActiveMQ,RabbitMQ,RocketMQ,Kafka 消息队列框架选哪个?

最近研究消息队列,发现好几个框架,搜罗一下进行对比,说一下选型说明: 1)中小型软件公司,建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便.不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除.RocketMQ也很不错,只是没有RabbitMQ出来的早,文档和网上的资料没有RabbitMQ多,但也是很不错,RocketMQ是阿里出品,现在阿里已经把

微服务框架Spring Cloud之使用事件和消息队列实现分布式事务

不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行多次等待. 有一种方法同样可以解决分布式事务问题, 并且性能较好, 这就是我这篇文章要介绍的使用事件,本地事务以及消息队列来实现分布式事务. 我们从一个简单的实例入手. 基本所有互联网应用都

Spring整合ActiveMQ:spring+JMS+ActiveMQ+Tomcat

一.目录结构 相关jar包 二.关键配置activmq.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi=&quo

JMS - ActiveMQ的简单使用

首先需要下载ActiveMQ,下面的链接给我们列出了所有版本:http://activemq.apache.org/download-archives.html每个版本为不同的OS提供了链接: 公司电脑是windows的,用目录下的activemq.bat启动: 端口号默认是61616,可以在conf/activemq.xml中看到: <transportConnectors> <!-- DOS protection, limit concurrent connections to 10