本文详细介绍了axis2集成jms中间件ActiveMQ的全过程,参考部分网络资料进行整理。
1 ActiveMQ安装及配置
1.1 ActiveMQ简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
1.2 ActiveMQ特性
1)多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2)完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3)对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4)通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上
5)支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6)支持通过JDBC和journal提供高速的消息持久化
7)从设计上保证了高性能的集群,客户端-服务器,点对点
8)支持Ajax,支持与Axis的整合
9)可以很容易得调用内嵌JMS provider,进行测试
1.3 下载及安装
从官网上下载最新版本,当前最新版本为5.14,上传到服务器上后解压(/data),得到/data/apache-activemq-5.14.0。
在Windows系统中运行,可以直接解压apache-activemq-5.9.0-bin.zip,并运行bin目录下的activemq.bat文件,此时使用的是默认的服务端口:61616和默认的console端口:8161。
在linux系统中,解压后需要先进入bin目录,执行以下:
./activemq setup ——安装设置 ./activemq start ——在后台启动服务 |
./linux-x86-64/wrapper ——将服务运行打印控制台(64位) |
通过以上命令启动默认端口同上。
1.4 默认端口配置
ActiveMQ的主要配置集中在两个文件中,一个是activemq.xml,在其中对ActiveMQ的模式及对外提供服务的端口等信息进行配置。另一个是jetty.xml,通过在activemq.xml中进行引用来生效,主要提供对web console、REST等的常用环境参数进行配置。
1)默认服务端口修改
打开conf/activemq.xml文件,修改以下部分
2)默认web console端口修改
打开conf/jetty.xml文件,修改以下部分
1.5 ActiveMQ多种部署方式
单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多种部署方式,但为满足分布式和高可用的需求,需要将两种部署方式相结合才能满足,后面将重点讲解如何将两种部署方式相结合。
1.5.1 Master-Slave部署方式
Master-Slave部署方式通过修改conf/activemq.xml文件的如下内容实现:
默认使用shared filesystem Master-Slave部署方式,配置如上图所示。此处推荐使用Replicated LevelDB Store方式。
以下为三种方式的明细介绍:
1)shared filesystem Master-Slave部署方式
主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。
2)shared database Master-Slave方式
与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已。
3)Replicated LevelDB Store方式(推荐使用)
这种主备方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。
其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。
如果master死了,得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master将会存储并更新然后等待 (2-1)=1个slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。
单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica nodes,以防止一个node失败了,服务中断。
1.5.2 Broker-Cluster部署方式
前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。
Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。
1)Static Broker-Cluster部署
在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker,内容如下:
2)Dynamic Broker-Cluster部署
在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找,内容如下:
然后在transportConnector中增加参数discoveryUri="multicast://default",修改如下:
1.5.3 混合部署
由于部署需求,若副本在同一机器上,则推荐使用Static Broker-Cluster + shared filesystem Master-Slave,若不在同一机器上,则推荐使用Replicated LevelDB Store + Static Broker-Cluster。配置方式只需将Static Broker-Cluster中的uri由static:(tcp://0.0.0.0:61617)修改为masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)即可。其他不详细赘述。
2 Axis2配置
Axis2天然支持多种Transport协议,包括HTTP、JMS、MAIL等,在其天然支持的JMS Transport协议中,Axis2默认使用ActiveMQ,因此只需将功能打开即可。但早期版本的Axis2的支持性并不好,一直到1.6版本之后才开放,本测试使用版本1.7.3,下载地址:http://axis.apache.org/axis2/java/core/download.html
2.1 容器配置
Axis2常用发布方式包括两类,一类是Axis2自带容器发布,另一类是Tomcat等容器的web服务发布。前一类可修改axis2server.bat或axis2server.sh,增加ActiveMQ相关依赖包引用,如下:
后一类则需要将ActiveMQ相关依赖包复制进入$Tomcat/webapps/axis2/WEB-INF/lib下面(或者修改web.xml)。
之后打开$Tomcat/webapps/axis2/WEB-INF/conf/axis2.xml文件,添加如下内容:
Sender和Receiver除了最外层不同,其他元素配置项相同。详细配置参见官网地址:http://axis.apache.org/axis2/java/transports/jms.html
2.2 服务端配置
修改service.xml,对需要向外开放的服务进行配置,增加内容如下:
服务发布后,若未配置transport.jms.Destination,则会自动在ActiveMQ中创建服务名对应的queue,如下图所示:
2.3 客户端调用
2.3.1 JMS调用
Jms调用与传统webservice调用最大的区别就是调用地址的变更,jms调用地址需满足如下规则:
jms-epr = "jms:/" jms-dest [ "?" param *( [ "&" param ] ) ] param = param-name "=" param-value |
其中jms-dest代表目标端的JNDI的名称,如本例中的uploadMeter:
ServiceJMSEndpointURL = "jms:/uploadMeter?" + "transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&" + "java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&"+ "java.naming.provider.url=failover:(tcp://dw212:61616,tcp://dw210:61616)&"+ "java.naming.security.principal=admin&"+ "java.naming.security.credentials=admin" |
在链接中指定了链接工厂类(java.naming.factory.initial)、activeMQ的地址(java.naming.provider.url)、和链接使用的用户名(java.naming.security.principal)和密码(java.naming.security.credentials)。
2.3.2 客户端调用代码
1.7新版本客户端中部分api进行了改动,以下就改动后提供两种方式进行调用:
1)RPC调用
public String callWebServiceRPC(String url, String nsname, String port, String[] paras) { String response = null; try { RPCServiceClient serviceClient = new RPCServiceClient(); EndpointReference targetEPR = new EndpointReference(url); Options options = serviceClient.getOptions(); //transport in TransportListener tl = new JMSListener(); TransportInDescription tin = new TransportInDescription(Constants.TRANSPORT_JMS); tin.setReceiver(tl); ConfigurationContext ctx = ConfigurationContextFactory.createDefaultConfigurationContext(); TransportOutDescription tout = new TransportOutDescription( Constants.TRANSPORT_JMS); JMSSender jmsSender = new JMSSender(); tout.setSender(jmsSender); jmsSender.init(ctx, tout); //初始化jms发送端,否则报空指针异常 options.setTransportIn(tin); options.setTransportOut(tout); options.setTo(targetEPR); // options.setReplyTo(new EndpointReference(WebTest.ServiceJMSReplyEndpointURL)); // Setting the operation QName opOpenAccount = new QName("http://webservice.dareway.com/xsd", port); OMElement result = serviceClient.invokeBlocking(opOpenAccount, paras); System.out.println(">> Using RPC Call...."); response = result.getFirstElement().getText().toString(); System.out.println(">> Returned Message: " + response); } catch (Exception e) { e.printStackTrace(); } return response; } |
2)CallBack调用
public String callWebServiceJMS(String url, String nsname, String port, String[] paras) { EndpointReference targetEPR = new EndpointReference(url); ServiceClient sender = null; try { OMElement para = this.getOMElement(nsname, port, paras); AxisCallback callback = new AxisCallback() { //新API public void onMessage(MessageContext arg0) { // TODO Auto-generated method stub OMElement omElement = arg0.getEnvelope().getBody().getFirstElement().getFirstElement(); String res = omElement == null ? "" : omElement.getText(); results = res; } public void onFault(MessageContext arg0) { // TODO Auto-generated method stub System.out.println("调用失败:" +arg0.getEnvelope().getBody().getFault().toString()); } public void onError(Exception arg0) { // TODO Auto-generated method stub arg0.printStackTrace(); } public void onComplete() { // TODO Auto-generated method stub System.out.println("调用成功 ! "); complete = true; } }; ConfigurationContext ctx = ConfigurationContextFactory .createConfigurationContextFromFileSystem("src/META-INF/axis2.xml"); // Non-Blocking Invocation sender = new ServiceClient(ctx,null); Options options = sender.getOptions(); //transport in JMSListener tl = new JMSListener(); TransportInDescription transportIn = new TransportInDescription(Constants.TRANSPORT_JMS); transportIn.setReceiver(tl); tl.init(ctx, transportIn); TransportOutDescription tout = new TransportOutDescription(Constants.TRANSPORT_JMS); JMSSender jmsSender = new JMSSender(); tout.setSender(jmsSender); jmsSender.init(ctx, tout); options.setTransportIn(transportIn); // options.setUseSeparateListener(true); options.setTransportOut(tout); options.setTo(targetEPR); // options.setReplyTo(new EndpointReference(WebTest.ServiceJMSReplyEndpointURL)); options.setTimeOutInMilliSeconds(1000 * 60 * 20); sender.sendReceiveNonBlocking(para, callback); // Wait till the callback receives the response. while (!complete) { Thread.sleep(1000); } } catch (AxisFault axisFault) { axisFault.printStackTrace(); } catch (Exception ex) { ex.printStackTrace(); } finally { try { sender.cleanup(); } catch (AxisFault axisFault) { } } return results; } |
PS:文中部分材料来源于网络,仅供参考。