JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合

[TOC]

缘由:

最近在用netty开发游戏服务器,目前有这样的一个场景,聊天服务器和逻辑服务器要进行消息交互,比如,某个玩家往某个公会提交了加入申请,这个申请动作是在逻辑服务器上完成的,但是要产生一条申请消息,由聊天服务器推送到对应的公会频道,目前这个申请消息就是通过jms发送到聊天服务器上,聊天服务器监听到后,推送到对应的公会频道.

下面主要介绍以下几点

- JMS简介

- 消息传递模型

- ActiveMQ介绍

- 安装使用

- spring整合JMS

- 代码相关


JMS简介

J Java 消息服务(Java Message Service,简称JMS)是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。JMS 在其中扮演的角色与JDBC 很相似,正如JDBC 提供了一套用于访问各种不同关系数据库的公共API,JMS 也提供了独立于特定厂商的企业消息系统访问方式。

使用JMS 的应用程序被称为JMS 客户端,处理消息路由与传递的消息系统被称为JMS Provider,而JMS 应用则是由多个JMS 客户端和一个JMS Provider 构成的业务系统。发送消息的JMS 客户端被称为生产者(producer),而接收消息的JMS 客户端则被称为消费者(consumer)。同一JMS 客户端既可以是生产者也可以是消费者。

JMS 的编程过程很简单,概括为:应用程序A 发送一条消息到消息服务器(也就是JMS Provider)的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶。如图

消息组成

1. 头(head)
    每条JMS 消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值:

a. 由JMS 提供者在生成或传送消息的过程中自动设置

b. 由生产者客户机通过在创建消息生产者时指定的设置进行设置

c. 由生产者客户机逐一对各条消息进行设置

  1. 属性(property)

    消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,其中可以包括如下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。

  2. 主体(body)

    包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。

    StreamMessage 一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。

    MapMessage 一种主体中包含一组键–值对的消息。没有定义条目顺序。

    TextMessage 一种主体中包含Java字符串的消息(例如,XML消息)。

    ObjectMessage 一种主体中包含序列化Java对象的消息。

    BytesMessage 一种主体中包含连续字节流的消息。

消息传递模型

JMS支持两种消息传递模型:点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别:

a. PTP消息传递模型规定了一条消息之恩能够传递费一个接收方。

b. Pub/sub消息传递模型允许一条消息传递给多个接收方

每个模型都通过扩展公用基类来实现。例如:javax.jms.Queue和Javax.jms.Topic都扩展自javax.jms.Destination类。

  1. 点对点消息传递

    通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,目标类型时队列。消息首先被传送至队列目标,然后从改对垒将消息传送至对此队列进行监听的某个消费者,如下图:

    一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,JMS Provider将根据“先来者优先”的原则确定由哪个接收方接受下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不会自动推动给客户端的,而是要由客户端从队列中请求获得。

  2. 发布/订阅消息传递

    通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或送消费者。如下图:

    主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时改消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上时一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。

    上面两种消息传递模型里,我们都需要定义消息生产者和消费者,生产者吧消息发送到JMS Provider的某个目标地址(Destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在:

    1. 异步消息接收者创建的网络流量比较小。单向对东消息,并使之通过管道进入消息监听器。管道操作支持将多条消息聚合为一个网络调用。
    2. 异步消息接收者使用线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。

QUEUE和TOPIC的比较

  1. JMS Queue执行load balancer语义

    一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它讲被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另外一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。

  2. Topic实现publish和subscribe语义

    一条消息被publish时,他将发送给所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

  3. 分别对应两种消息模式

    Point-to-Point(点对点),Publisher/Subscriber Model(发布/订阅者)

    其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化订阅)和durable subscription(持久化订阅)两种消息处理方式。

ActiveMQ介绍

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

特点

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring4.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试
  11. ActiveMQ速度非常快;一般要比jbossMQ快10倍。

ActiveMQ应用场景

  1. 不同语言应用集成

    ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ 也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。

  2. 作为RPC的替代

    使用RPC同步调用的应用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,但是转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。使用同步请求的系统在规模上有较大的限制,因为请求会被阻塞,从而导致整个系统变慢。如果使用异步消息替代,可以很容易增加额外的消息接收者,使得消息能被并发消耗,从而加快请求处理。当然,你的系统应用间应该是解耦的。

  3. 应用之间解耦

    正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。不仅可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。相比使用同步的系统(调用者必须等待被调用者返回信息),异步系统(调用方发送消息后就不管,即fire-and-forget)能够给我们带来事件驱动架构(event-driven architecture EDA)。

  4. 作为事件驱动架构的主干

    解耦,异步架构的系统允许通过代理器自己配置更多的客户端,内存等(即vertical scalability)来扩大系统,而不是增加更多的代理器(即horizontal scalability)。考虑如亚马逊这样繁忙的电子商务系统。当用户购买物品,事实上系统需要很多步骤去处理,包括下单,创建发票,付款,执行订单,运输等。但是用户下单后,会立即返回“谢谢你下单”的界面。不只是没有延迟,而且用户还会受到一封邮件表明订单已经收到。在亚马逊下单的例子就是一个多步处理的例子。每一步都由单独的服务去处理。当用户下单是,有一个同步的体积表单动作,但整个处理流程并不通过浏览器同步处理。相反地,订单马上被接受和反馈。而剩下的步骤就通过异步处理。如果在处理过程中出错,用户会通过邮件收到通知。这样的异步处理能提供高负载和高可用性。

  5. 提高系统扩展性

    很多使用事件驱动设计的系统是为了获得高可扩展性,例如电子商务,政府,制造业,线上游戏等。通过异步消息分开商业处理步骤给各个应用,能够带来很多可能性。考虑设计一个应用来完成一项特殊的任务。这就是面向服务的架构(service-oriented architecture SOA)。每一个服务完成一个功能并且只有一个功能。应用就通过服务组合起来,服务间使用异步消息和最终一致性。这样的设计便可以引入一个复杂事件处理概念(complex event processing CEP)。使用CEP,部件间的交互可以被记录追踪。在异步消息系统中,可以很容易在部件间增加一层处理。

安装使用

下载安装包 http://activemq.apache.org/activemq-5133-release.html

系统 版本
windows apache-activemq-5.13.3-bin.zip
linux apache-activemq-5.13.3-bin.tar.gz

win下 解压得到如下图目录,根据操作系统进入对应的文件夹win32或者win64,双击activemq.bat运行.

ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。点击QUEUES可以看到当前的消息队列.

spring整合JMS

spring-jms介绍

  1. spring提供了一个jms集成框架,这个框架如spring 集成jdbc api一样,简化了jms api的使用。
  2. jms可以简单的分成两个功能区,消息的生产和消息的消费。JmsTemplate类用来生成消息和同步接受消息。和其它java ee的消息驱动样式一样,对异步消息,spring也提供了许多消息监听容器用来创建消息驱动的POJO(MDPs)。spring同时也提供了创建消息监听器的声明方式。
  3. org.springframework.jms.core 提供了使用JMS的核心功能,它包含JmsTemplate类,该类类似于jdbc中的jdbdTemplate,它通过对资源的创建和释放处理来简化jms开发。spring的模板类作为一种设计原则在spring框架中广泛使用,模板类对简单操作提供了帮助方法;对复杂操作,通过继承回调接口提供了重要处理过程的代理。jmsTemplate同样遵循这一设计原则,它提供了发送消息、同步消费消息、为用户提供JMS session和消息生产者的多种便利方法。
  4. org.springframework.jms.support提供了JmsException转译功能。它将checked的JmsException层次转换成uncheckedd异常的镜像层次。若抛出的异常不是javax.jms.JmsException的子类,这个异常将被封装成unchecked异常UncategorizedJmsException。
  5. org.springframework.jms.support.converter 提供了在java对象和jms消息之间转换的抽象MessageConverter。
  6. org.springframework.jms.support.destination提供了管理jms destination的多种策略,如对存放在jndi的destionation提供服务定位功能。
  7. org.springframework.jms.annotation通过使用@JmsListener提供了对注解驱动的监听端的支持。
  8. org.springframework.jms.config 支持jms命名空间的解析,同时也支持配置监听容器和生成监听端。
  9. org.springframework.jms.connection提供了适用于standonle应用的ConnectionFactory的实现。对jms的事务管理实现jmsTranscationmanager. 这允许jms 作为事务资源无缝的集成到spring事务管理机制中。

相关配置

maven依赖

<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>javax.jms-api</artifactId>
  <version>2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>${activemq.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>${activemq.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-spring</artifactId>
  <version>${activemq.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
</dependency>
<dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${spring.version}</version>
</dependency>

生产者端的spring配置 - activemqContext.xml(该文件之后用import resource=”activemqContext.xml” 导入到spring配置文件中)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="${activemq.url}" userName="admin" password="admin" />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    <!-- Spring JmsTemplate 的消息生产者 start -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
    <!--Spring JmsTemplate 的消息生产者 end -->
    <!--这个是队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>UnionApplyMsgQueue</value>
        </constructor-arg>
    </bean>
</beans>  

消费者端spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="${activemq.url}" userName="admin" password="admin" />
    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    <!--这个是队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>UnionApplyMsgQueue</value>
        </constructor-arg>
    </bean>
    <!-- 消息消费者 start -->
    <!-- 消息监听器 -->
    <bean id="queueReceiver1" class="com.game.wego.chat.listener.ConsumerMessageListener" />
    <!-- 定义UnionApplyMsgQueue监听器 -->
    <jms:listener-container destination-type="queue"
        container-type="default" connection-factory="connectionFactory"
        acknowledge="auto">
        <jms:listener destination="UnionApplyMsgQueue" ref="queueReceiver1" />
    </jms:listener-container>
    <!-- 消息消费者 end -->
</beans>  

总结下,消费者端配置监听器,监听queue,有消息,就调用ConsumerMessageListener,这个是自己写的类,实现MessageListener接口的方法即可.生产者端配置队列目的地,JmsTemplate .注意这里我们用的是CachingConnectionFactory.ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer,节省了资源开销.

Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。

ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时应该是如下定义:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>  

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory"/>
    <property name="maxConnections" value="10"/>
</bean>  

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>  

代码相关

  1. ActiveMQ的helloworld
package com.activemq.producter;  

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;  

/**
 * ActiveMQ发送类
 * <功能详细描述>
 *
 * @author  Administrator
 * @version  [版本号, 2014年7月27日]
 * @see  [相关类/方法]
 * @since  [产品/模块版本]
 */
public class MessageProducter
{  

    /*
     * @param args
     * @see [类、类#方法、类#成员]
     */
    public static void main(String[] args) throws JMSException
    {
     // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://192.168.197.130:61616");
        //JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一个发送或接收消息的线程
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值my-queue是Query的名字
        Destination destination = session.createQueue("my-queue");
        // MessageProducer:消息生产者
        MessageProducer producer = session.createProducer(destination);
        //设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //发送一条消息
        sendMsg(session, producer);
        session.commit();
        connection.close();   

    }  

    /**
     * 在指定的会话上,通过指定的消息生产者发出一条消息
     *
     * @param session    消息会话
     * @param producer 消息生产者
     */
    public static void sendMsg(Session session, MessageProducer producer) throws JMSException {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("Hello ActiveMQ!");
            //通过消息生产者发出消息
            producer.send(message);
            System.out.println("");
    }
}  
package com.activemq.reciever;  

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;  

public class JmsReceiver
{  

    public static void main(String[] args)
        throws JMSException
    {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory =
            new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://192.168.197.130:61616");
        //JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一个发送或接收消息的线程
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
        Destination destination = session.createQueue("my-queue");
        // 消费者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        while (true)
        {
            TextMessage message = (TextMessage)consumer.receive(1000);
            if (null != message)
                System.out.println("收到消息:" + message.getText());
            else
                break;
        }
        session.close();
        connection.close();  

    }  

}  

使用了spring-jms之后,就可用用jmsTemplate来取代连接打开关闭维护等事宜,会方便很多.

2. ConsumerMessageListener监听器实现类,消费者用

public class ConsumerMessageListener implements MessageListener {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);

    @Autowired
    PlayerLogicService playerService;

    @Autowired
    private ResourceDPService resourceDP;

    @Override
    public void onMessage(Message message) {
        ObjectMessage objMsg = (ObjectMessage) message;
        try {
            RspApplyUnionMsg rspApplyMsg = (RspApplyUnionMsg) objMsg.getObject();
            logger.debug("消息内容是:" + JsonUtil.toJsonString(rspApplyMsg) + "申请加入公会");
            Player player = playerService.getOnlinePlayerInThisApp(rspApplyMsg.getPlayerId());
            if (null != player) {
                RspUnionMsg rspData = new RspUnionMsg();
                ResponseMessage resp = new ResponseMessage();
                rspData.setSpeaker(player.getName());
                rspData.setPhyle(player.getPhyle());
                rspData.setPlayerId(player.getId());
                rspData.setMsgType(2);
                resp.setId(com.game.wego.chat.message.Message.RSP_UNION_CHAT_PUSH);
                resp.setData(rspData);
                // 公会内广播
                ChannelGroup channels = GameContext.getUnionGroup(rspApplyMsg.getUnionId());
                if (null != channels) {
                    channels.writeAndFlush(resp);
                }
                resourceDP.pushMsg(rspData, rspApplyMsg.getUnionId());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

生产者实现类

@Service
public class ActivemqService {
    @SuppressWarnings("unused")
    private static final Logger logger = LoggerFactory.getLogger(ActivemqService.class);

    @Autowired
    @Qualifier("queueDestination")
    private Destination destination; 

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void sendMessage(final Serializable message) {
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(message);
            }
        });
    }

}

这里可以用消息转换器MessageConverter来代替代码中的new MessageCreator操作,原理不变, 看着简洁点.参见http://haohaoxuexi.iteye.com/blog/1900937

3. 不同消息类型的收发

/**
   * 向默认队列发送text消息
   */
  public void sendMessage(final String msg) {
    String destination = jmsTemplate.getDefaultDestination().toString();
    System.out.println("ProducerService向队列" + destination + "发送了消息:\t" + msg);
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(msg);
      }
    });
  }

  /**
   * 向默认队列发送map消息
   */
  public void sendMapMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        message.setString("name", "小西山");
        return message;
      }
    });
  }

  /**
   * 向默认队列发送Object消息
   */
  public void sendObjectMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        Staff staff = new Staff(1, "搬砖工"); // Staff必须实现序列化
        ObjectMessage message = session.createObjectMessage(staff);
        return message;
      }
    });
  }

  /**
   * 向默认队列发送Bytes消息
   */
  public void sendBytesMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        String str = "BytesMessage 字节消息";
        BytesMessage message = session.createBytesMessage();
        message.writeBytes(str.getBytes());
        return message;
      }
    });
  }

  /**
   * 向默认队列发送Stream消息
   */
  public void sendStreamMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        String str = "StreamMessage 流消息";
        StreamMessage message = session.createStreamMessage();
        message.writeString(str);
        message.writeInt(521);
        return message;
      }
    });
  }
/**
   * 接受消息
   */
  public void receive(Destination destination) throws JMSException {
    Message message = jmsTemplate.receive(destination);
    // 如果是文本消息
    if (message instanceof TextMessage) {
      TextMessage tm = (TextMessage) message;
      System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + tm.getText());
    }

    // 如果是Map消息
    if (message instanceof MapMessage) {
      MapMessage mm = (MapMessage) message;
      System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t"
          + mm.getString("name"));
    }

    // 如果是Object消息
    if (message instanceof ObjectMessage) {
      ObjectMessage om = (ObjectMessage) message;
      Staff staff = (Staff) om.getObject();
      System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + staff);
    }

    // 如果是bytes消息
    if (message instanceof BytesMessage) {
      byte[] b = new byte[1024];
      int len = -1;
      BytesMessage bm = (BytesMessage) message;
      while ((len = bm.readBytes(b)) != -1) {
        System.out.println(new String(b, 0, len));
      }
    }

    // 如果是Stream消息
    if (message instanceof StreamMessage) {
      StreamMessage sm = (StreamMessage) message;
      System.out.println(sm.readString());
      System.out.println(sm.readInt());
    }

  }

应用对象消息时遇到的一个bug,对象消息时,对象必须实现序列化接口,为了传输需要,序列化前后要对比id.确保无误.他们去使用的总是同一个对象,包括序列id都要相同,就导致两个系统的时候,要公共依赖同一个地方的同一个对象文件.这点要注意.

时间: 2024-10-26 14:38:19

JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合的相关文章

深入浅出JMS(三)--ActiveMQ简单的HelloWorld实例

第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点. 第二篇博文深入浅出JMS(二)–ActiveMQ简单介绍以及安装,我们介绍了消息中间件ActiveMQ,安装,启动,以及优缺点. 这篇博文,我们使用ActiveMQ为大家实现一种点对点的消息模型.如果你对点对点模型的认识较浅,可以看一下第一篇博文的介绍. JMS其实并没有想象的那么高大上,看完这篇博文之后,你就知

Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用(转)

转自:http://www.cnblogs.com/luochengqiuse/p/4678020.html?utm_source=tuicool&utm_medium=referral 最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢.最近几天研究了下,把自己所有看下来的文档和了解总结一下. 一. 认识JMS 1.概述 对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的

JMS Apache ActiveMQ(消息中间件)使用攻略

首先在做任何技术预研的时候:我会搞清除它是什么?为什么要使用它? JMS 他是什么? Java Message Service是一组接口和相关语义,他定义了JMS客户如何访问企业消息产生的功能. JMS支持消息中间件的两种传递模式:点到点模式(point to point)和发布-订阅模式(pub/sub). JMS消息模型: 消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息.设置优先权和失效时间等等,并且为消息确定路由. 属性(p

ActiveMQ介绍与使用

1      ActiveMQ介绍 1.1    什么是ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 主要特点: 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenW

Spring整合JMS——基于ActiveMQ实现

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2  

JMS and ActiveMQ first lesson(转)

JMS and ActiveMQ first lesson -- jms基础概念和应用场景 2011-6-18 PM 9:30 主讲:kimmking <[email protected]> 整理:林木森 ppt下载地址: http://code.google.com/p/activemq-store-mongodb/downloads/list 下面开始: kimmking:介绍下jms和ActiveMQ.在讲JMS之前,我们聊聊相关的背景.谁知道JMS是什么意思? kimmking:对,是

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中 一.JMS的理解JMS(Java Message Service)是jcp组织02-03年定义了jsr914规范(http://jcp.org/en/jsr/detail?id=914),它定义了消息的格式和消息传递模式:消息包括:消息头,消息扩展属性和消息体,其结构看起来与SOAP非常的相似,但一般情况下,SOAP主要关注远程服务调用,而消息则专注于信息的交换:消息分为:消息生产者,消息服务器和消息消费者.生产者与消费者之间

Spring整合JMS(一)——基于ActiveMQ实现

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2  

activemq的安装使用

近期有项目中用到消息队列,JMS规范中实现最好的开源框架就是activemq.所以选择它(当然这是我老大决定的,像我这样的刚入职场的小菜鸟考虑问题还不太全面)作为消息队列数据传输.公司有有成型的消息队列框架的实现,但是公司中的框架 实现的好繁琐,考虑的好全面,考虑到了同步消息传输和异步消息传输,持久化消息存储和非持久化消息存储,消息之间点对点传输还是pubilc/subscribe(订阅)等等.考虑的非常复杂.所以jar包依赖冲突比較严重.于是乎就选择了activemq,闲话少扯,省得蛋痛,以下