[TOC]
缘由:
最近在用netty开发游戏服务器,目前有这样的一个场景,聊天服务器和逻辑服务器要进行消息交互,比如,某个玩家往某个公会提交了加入申请,这个申请动作是在逻辑服务器上完成的,但是要产生一条申请消息,由聊天服务器推送到对应的公会频道,目前这个申请消息就是通过jms发送到聊天服务器上,聊天服务器监听到后,推送到对应的公会频道.
下面主要介绍以下几点
- JMS简介
- 消息传递模型
- ActiveMQ介绍
- 安装使用
- spring整合JMS
- 代码相关
JMS简介
J Java 消息服务(Java Message Service,简称JMS)是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。JMS 在其中扮演的角色与JDBC 很相似,正如JDBC 提供了一套用于访问各种不同关系数据库的公共API,JMS 也提供了独立于特定厂商的企业消息系统访问方式。
使用JMS 的应用程序被称为JMS 客户端,处理消息路由与传递的消息系统被称为JMS Provider,而JMS 应用则是由多个JMS 客户端和一个JMS Provider 构成的业务系统。发送消息的JMS 客户端被称为生产者(producer),而接收消息的JMS 客户端则被称为消费者(consumer)。同一JMS 客户端既可以是生产者也可以是消费者。
JMS 的编程过程很简单,概括为:应用程序A 发送一条消息到消息服务器(也就是JMS Provider)的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶。如图
消息组成
1. 头(head)
每条JMS 消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值:
a. 由JMS 提供者在生成或传送消息的过程中自动设置
b. 由生产者客户机通过在创建消息生产者时指定的设置进行设置
c. 由生产者客户机逐一对各条消息进行设置
- 属性(property)
消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,其中可以包括如下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。
- 主体(body)
包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。
StreamMessage 一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。
MapMessage 一种主体中包含一组键–值对的消息。没有定义条目顺序。
TextMessage 一种主体中包含Java字符串的消息(例如,XML消息)。
ObjectMessage 一种主体中包含序列化Java对象的消息。
BytesMessage 一种主体中包含连续字节流的消息。
消息传递模型
JMS支持两种消息传递模型:点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别:
a. PTP消息传递模型规定了一条消息之恩能够传递费一个接收方。
b. Pub/sub消息传递模型允许一条消息传递给多个接收方
每个模型都通过扩展公用基类来实现。例如:javax.jms.Queue和Javax.jms.Topic都扩展自javax.jms.Destination类。
- 点对点消息传递
通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,目标类型时队列。消息首先被传送至队列目标,然后从改对垒将消息传送至对此队列进行监听的某个消费者,如下图:
一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,JMS Provider将根据“先来者优先”的原则确定由哪个接收方接受下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不会自动推动给客户端的,而是要由客户端从队列中请求获得。
- 发布/订阅消息传递
通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或送消费者。如下图:
主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时改消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上时一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。
上面两种消息传递模型里,我们都需要定义消息生产者和消费者,生产者吧消息发送到JMS Provider的某个目标地址(Destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在:
- 异步消息接收者创建的网络流量比较小。单向对东消息,并使之通过管道进入消息监听器。管道操作支持将多条消息聚合为一个网络调用。
- 异步消息接收者使用线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。
QUEUE和TOPIC的比较
- JMS Queue执行load balancer语义
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它讲被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另外一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
- Topic实现publish和subscribe语义
一条消息被publish时,他将发送给所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
- 分别对应两种消息模式
Point-to-Point(点对点),Publisher/Subscriber Model(发布/订阅者)
其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化订阅)和durable subscription(持久化订阅)两种消息处理方式。
ActiveMQ介绍
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
特点
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring4.0的特性
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
- ActiveMQ速度非常快;一般要比jbossMQ快10倍。
ActiveMQ应用场景
- 不同语言应用集成
ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ 也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。
- 作为RPC的替代
使用RPC同步调用的应用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,但是转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。使用同步请求的系统在规模上有较大的限制,因为请求会被阻塞,从而导致整个系统变慢。如果使用异步消息替代,可以很容易增加额外的消息接收者,使得消息能被并发消耗,从而加快请求处理。当然,你的系统应用间应该是解耦的。
- 应用之间解耦
正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。不仅可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。相比使用同步的系统(调用者必须等待被调用者返回信息),异步系统(调用方发送消息后就不管,即fire-and-forget)能够给我们带来事件驱动架构(event-driven architecture EDA)。
- 作为事件驱动架构的主干
解耦,异步架构的系统允许通过代理器自己配置更多的客户端,内存等(即vertical scalability)来扩大系统,而不是增加更多的代理器(即horizontal scalability)。考虑如亚马逊这样繁忙的电子商务系统。当用户购买物品,事实上系统需要很多步骤去处理,包括下单,创建发票,付款,执行订单,运输等。但是用户下单后,会立即返回“谢谢你下单”的界面。不只是没有延迟,而且用户还会受到一封邮件表明订单已经收到。在亚马逊下单的例子就是一个多步处理的例子。每一步都由单独的服务去处理。当用户下单是,有一个同步的体积表单动作,但整个处理流程并不通过浏览器同步处理。相反地,订单马上被接受和反馈。而剩下的步骤就通过异步处理。如果在处理过程中出错,用户会通过邮件收到通知。这样的异步处理能提供高负载和高可用性。
- 提高系统扩展性
很多使用事件驱动设计的系统是为了获得高可扩展性,例如电子商务,政府,制造业,线上游戏等。通过异步消息分开商业处理步骤给各个应用,能够带来很多可能性。考虑设计一个应用来完成一项特殊的任务。这就是面向服务的架构(service-oriented architecture SOA)。每一个服务完成一个功能并且只有一个功能。应用就通过服务组合起来,服务间使用异步消息和最终一致性。这样的设计便可以引入一个复杂事件处理概念(complex event processing CEP)。使用CEP,部件间的交互可以被记录追踪。在异步消息系统中,可以很容易在部件间增加一层处理。
安装使用
下载安装包 http://activemq.apache.org/activemq-5133-release.html
系统 | 版本 |
---|---|
windows | apache-activemq-5.13.3-bin.zip |
linux | apache-activemq-5.13.3-bin.tar.gz |
win下 解压得到如下图目录,根据操作系统进入对应的文件夹win32或者win64,双击activemq.bat运行.
ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。点击QUEUES可以看到当前的消息队列.
spring整合JMS
spring-jms介绍
- spring提供了一个jms集成框架,这个框架如spring 集成jdbc api一样,简化了jms api的使用。
- jms可以简单的分成两个功能区,消息的生产和消息的消费。JmsTemplate类用来生成消息和同步接受消息。和其它java ee的消息驱动样式一样,对异步消息,spring也提供了许多消息监听容器用来创建消息驱动的POJO(MDPs)。spring同时也提供了创建消息监听器的声明方式。
- org.springframework.jms.core 提供了使用JMS的核心功能,它包含JmsTemplate类,该类类似于jdbc中的jdbdTemplate,它通过对资源的创建和释放处理来简化jms开发。spring的模板类作为一种设计原则在spring框架中广泛使用,模板类对简单操作提供了帮助方法;对复杂操作,通过继承回调接口提供了重要处理过程的代理。jmsTemplate同样遵循这一设计原则,它提供了发送消息、同步消费消息、为用户提供JMS session和消息生产者的多种便利方法。
- org.springframework.jms.support提供了JmsException转译功能。它将checked的JmsException层次转换成uncheckedd异常的镜像层次。若抛出的异常不是javax.jms.JmsException的子类,这个异常将被封装成unchecked异常UncategorizedJmsException。
- org.springframework.jms.support.converter 提供了在java对象和jms消息之间转换的抽象MessageConverter。
- org.springframework.jms.support.destination提供了管理jms destination的多种策略,如对存放在jndi的destionation提供服务定位功能。
- org.springframework.jms.annotation通过使用@JmsListener提供了对注解驱动的监听端的支持。
- org.springframework.jms.config 支持jms命名空间的解析,同时也支持配置监听容器和生成监听端。
- org.springframework.jms.connection提供了适用于standonle应用的ConnectionFactory的实现。对jms的事务管理实现jmsTranscationmanager. 这允许jms 作为事务资源无缝的集成到spring事务管理机制中。
相关配置
maven依赖
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
生产者端的spring配置 - activemqContext.xml(该文件之后用import resource=”activemqContext.xml” 导入到spring配置文件中)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${activemq.url}" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- Spring JmsTemplate 的消息生产者 start -->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生产者 end -->
<!--这个是队列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>UnionApplyMsgQueue</value>
</constructor-arg>
</bean>
</beans>
消费者端spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${activemq.url}" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!--这个是队列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>UnionApplyMsgQueue</value>
</constructor-arg>
</bean>
<!-- 消息消费者 start -->
<!-- 消息监听器 -->
<bean id="queueReceiver1" class="com.game.wego.chat.listener.ConsumerMessageListener" />
<!-- 定义UnionApplyMsgQueue监听器 -->
<jms:listener-container destination-type="queue"
container-type="default" connection-factory="connectionFactory"
acknowledge="auto">
<jms:listener destination="UnionApplyMsgQueue" ref="queueReceiver1" />
</jms:listener-container>
<!-- 消息消费者 end -->
</beans>
总结下,消费者端配置监听器,监听queue,有消息,就调用ConsumerMessageListener,这个是自己写的类,实现MessageListener接口的方法即可.生产者端配置队列目的地,JmsTemplate .注意这里我们用的是CachingConnectionFactory.ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer,节省了资源开销.
Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。
ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时应该是如下定义:
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
代码相关
- ActiveMQ的helloworld
package com.activemq.producter;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ发送类
* <功能详细描述>
*
* @author Administrator
* @version [版本号, 2014年7月27日]
* @see [相关类/方法]
* @since [产品/模块版本]
*/
public class MessageProducter
{
/*
* @param args
* @see [类、类#方法、类#成员]
*/
public static void main(String[] args) throws JMSException
{
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://192.168.197.130:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue("my-queue");
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送一条消息
sendMsg(session, producer);
session.commit();
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session 消息会话
* @param producer 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer) throws JMSException {
//创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
//通过消息生产者发出消息
producer.send(message);
System.out.println("");
}
}
package com.activemq.reciever;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsReceiver
{
public static void main(String[] args)
throws JMSException
{
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://192.168.197.130:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
Destination destination = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(destination);
while (true)
{
TextMessage message = (TextMessage)consumer.receive(1000);
if (null != message)
System.out.println("收到消息:" + message.getText());
else
break;
}
session.close();
connection.close();
}
}
使用了spring-jms之后,就可用用jmsTemplate来取代连接打开关闭维护等事宜,会方便很多.
2. ConsumerMessageListener监听器实现类,消费者用
public class ConsumerMessageListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);
@Autowired
PlayerLogicService playerService;
@Autowired
private ResourceDPService resourceDP;
@Override
public void onMessage(Message message) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
RspApplyUnionMsg rspApplyMsg = (RspApplyUnionMsg) objMsg.getObject();
logger.debug("消息内容是:" + JsonUtil.toJsonString(rspApplyMsg) + "申请加入公会");
Player player = playerService.getOnlinePlayerInThisApp(rspApplyMsg.getPlayerId());
if (null != player) {
RspUnionMsg rspData = new RspUnionMsg();
ResponseMessage resp = new ResponseMessage();
rspData.setSpeaker(player.getName());
rspData.setPhyle(player.getPhyle());
rspData.setPlayerId(player.getId());
rspData.setMsgType(2);
resp.setId(com.game.wego.chat.message.Message.RSP_UNION_CHAT_PUSH);
resp.setData(rspData);
// 公会内广播
ChannelGroup channels = GameContext.getUnionGroup(rspApplyMsg.getUnionId());
if (null != channels) {
channels.writeAndFlush(resp);
}
resourceDP.pushMsg(rspData, rspApplyMsg.getUnionId());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
生产者实现类
@Service
public class ActivemqService {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(ActivemqService.class);
@Autowired
@Qualifier("queueDestination")
private Destination destination;
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
public void sendMessage(final Serializable message) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(message);
}
});
}
}
这里可以用消息转换器MessageConverter来代替代码中的new MessageCreator操作,原理不变, 看着简洁点.参见http://haohaoxuexi.iteye.com/blog/1900937
3. 不同消息类型的收发
/**
* 向默认队列发送text消息
*/
public void sendMessage(final String msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println("ProducerService向队列" + destination + "发送了消息:\t" + msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
/**
* 向默认队列发送map消息
*/
public void sendMapMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("name", "小西山");
return message;
}
});
}
/**
* 向默认队列发送Object消息
*/
public void sendObjectMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Staff staff = new Staff(1, "搬砖工"); // Staff必须实现序列化
ObjectMessage message = session.createObjectMessage(staff);
return message;
}
});
}
/**
* 向默认队列发送Bytes消息
*/
public void sendBytesMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
String str = "BytesMessage 字节消息";
BytesMessage message = session.createBytesMessage();
message.writeBytes(str.getBytes());
return message;
}
});
}
/**
* 向默认队列发送Stream消息
*/
public void sendStreamMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
String str = "StreamMessage 流消息";
StreamMessage message = session.createStreamMessage();
message.writeString(str);
message.writeInt(521);
return message;
}
});
}
/**
* 接受消息
*/
public void receive(Destination destination) throws JMSException {
Message message = jmsTemplate.receive(destination);
// 如果是文本消息
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + tm.getText());
}
// 如果是Map消息
if (message instanceof MapMessage) {
MapMessage mm = (MapMessage) message;
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t"
+ mm.getString("name"));
}
// 如果是Object消息
if (message instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) message;
Staff staff = (Staff) om.getObject();
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + staff);
}
// 如果是bytes消息
if (message instanceof BytesMessage) {
byte[] b = new byte[1024];
int len = -1;
BytesMessage bm = (BytesMessage) message;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
// 如果是Stream消息
if (message instanceof StreamMessage) {
StreamMessage sm = (StreamMessage) message;
System.out.println(sm.readString());
System.out.println(sm.readInt());
}
}
应用对象消息时遇到的一个bug,对象消息时,对象必须实现序列化接口,为了传输需要,序列化前后要对比id.确保无误.他们去使用的总是同一个对象,包括序列id都要相同,就导致两个系统的时候,要公共依赖同一个地方的同一个对象文件.这点要注意.