最近在研究ofbiz jms,用了activemq玩了一下,ofbiz使用JNDI与其它JMS进行消息接收与发送,总结一下。
准备
ofbiz12.04 版本
activemq 5.5 版本
jar包
在base/lib下引入包
activemq-all-5.5.0.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
服务引擎配置
<span style="font-size:14px;"><jms-service name="serviceMessenger" send-mode="all"> <server jndi-server-name="default" jndi-name="topicConnectionFactory" topic-queue="OFBTopic" type="topic" listen="true"/> </jms-service></span>
jndi.properties配置
<span style="font-size:14px;">java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://127.0.0.1:61616 topic.OFBTopic=OFBTopic connectionFactoryNames=connectionFactory, queueConnectionFactory, topicConnectionFactory</span>
只试一个topic消息模式,如果想用queue队列,配置
<span style="font-size:14px;">queue.OFBQueue= OFBQueue</span>
测试
先启动activemq ,再启动ofbiz
ofbiz启动之后看到:
<span style="font-size:14px;">2015-01-15 11:47:56,970 ([email protected]) [ JmsListenerFactory.java:89 :INFO ] JMS Listener Factory Thread Finished; All listeners connected.</span>
如果报监听失败,自己查查什么问题。
之后再看activemq控制台
消息发送和接收
咱们是用ofbiz给出的测试例子:
services_test.xml
<span style="font-size:14px;"><service name="testJMSTopic" engine="jms" location="serviceMessenger" invoke="testScv"> <description>Test JMS Topic service</description> <attribute name="message" type="String" mode="IN"/> </service></span>
运行此服务,看下效果:
<span style="font-size:14px;">2015-01-15 12:09:10,754 (ActiveMQ Session Task-1) [ ModelService.java:469:INFO ] Set default value [999.9999] for parameter [defaultValue] 2015-01-15 12:09:10,755 (ActiveMQ Session Task-1) [ ServiceEcaRule.java:137:INFO ] For Service ECA [testScv] on [invoke] got false for condition: [message][equals][auto][true][String] ---- SVC-CONTEXT: message => 测试TOPIC消息 ---- SVC-CONTEXT: locale => zh_CN ---- SVC-CONTEXT: timeZone => sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null] ---- SVC-CONTEXT: userLogin => [GenericEntity:UserLogin][createdStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][createdTxStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][currentPassword,{SHA}47ca69ebb4bdc9ae0adec130880165d2cc05db1a(java.lang.String)][enabled,Y(java.lang.String)][hasLoggedOut,N(java.lang.String)][lastTimeZone,Asia/Macao(java.lang.String)][lastUpdatedStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][lastUpdatedTxStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][partyId,admin(java.lang.String)][successiveFailedLogins,0(java.lang.Long)][userLoginId,admin(java.lang.String)] ---- SVC-CONTEXT: defaultValue => 999.9999 -----SERVICE TEST----- : 测试TOPIC消息 ----- SVC: entity-default -----</span>
看下activemq控制台
入列消息一条,接收消息一条。
说明一下为什么会这样
我们运行服务发送了一条广播消息,这就是为什么Messages Enqueued有一条数据,那么为什么Messages Dequeued也有一条消息,因为ofbiz是服务端同时也是客户端,我们设置了监听不是。。
ofbiz作为服务端
我们只要看一下JMS的服务引擎就明白了,不多解释
ofbiz作为客户端
这块是怎么玩呢?
首先我们在服务引擎配置文件设置了监听为true,
其次,我们看下在加载服务引擎之后,在根据服务引擎配置文件加载JmsListenerFactory.java去加载JMS的监听。
最后,贴出来一段代码
JmsTopicListener.java
<span style="font-size:14px;"> public synchronized void load() throws GenericServiceException { try { InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer); TopicConnectionFactory factory = (TopicConnectionFactory) jndi.lookup(jndiName); if (factory != null) { con = factory.createTopicConnection(userName, password); con.setExceptionListener(this); session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) jndi.lookup(topicName); if (topic != null) { TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(this); con.start(); this.setConnected(true); if (Debug.infoOn()) Debug.logInfo("Listening to topic [" + topicName + "] on [" + jndiServer + "]...", module); } else { throw new GenericServiceException("Topic lookup failed."); } } else { throw new GenericServiceException("Factory (broker) lookup failed."); } } catch (NamingException ne) { throw new GenericServiceException("JNDI lookup problems; listener not running.", ne); } catch (JMSException je) { throw new GenericServiceException("JMS internal error; listener not running.", je); } catch (GeneralException ge) { throw new GenericServiceException("Problems with InitialContext; listener not running.", ge); } }</span>
当时,还有一个疑问,那有监听之后ofbiz做了什么事情去处理监听到消息事件之后的事情?
同样,贴出一段代码,大家就都明白了:
AbstractJmsListener.java
<span style="font-size:14px;">/** * Receives the MapMessage and processes the service. * @see javax.jms.MessageListener#onMessage(Message) */ public void onMessage(Message message) { MapMessage mapMessage = null; if (Debug.verboseOn()) Debug.logVerbose("JMS Message Received --> " + message, module); if (message instanceof MapMessage) { mapMessage = (MapMessage) message; } else { Debug.logError("Received message is not a MapMessage!", module); return; } runService(mapMessage); }</span>
<span style="font-size:14px;">/** * Runs the service defined in the MapMessage * @param message * @return Map */ protected Map<String, Object> runService(MapMessage message) { Map<String, ? extends Object> context = null; String serviceName = null; String xmlContext = null; try { serviceName = message.getString("serviceName"); xmlContext = message.getString("serviceContext"); if (serviceName == null || xmlContext == null) { Debug.logError("Message received is not an OFB service message. Ignored!", module); return null; } Object o = XmlSerializer.deserialize(xmlContext, dispatcher.getDelegator()); if (Debug.verboseOn()) Debug.logVerbose("De-Serialized Context --> " + o, module); if (ObjectType.instanceOf(o, "java.util.Map")) context = UtilGenerics.checkMap(o); } catch (JMSException je) { Debug.logError(je, "Problems reading message.", module); } catch (Exception e) { Debug.logError(e, "Problems deserializing the service context.", module); } try { ModelService model = dispatcher.getDispatchContext().getModelService(serviceName); if (!model.export) { Debug.logWarning("Attempt to invoke a non-exported service: " + serviceName, module); return null; } } catch (GenericServiceException e) { Debug.logError(e, "Unable to get ModelService for service : " + serviceName, module); } if (Debug.verboseOn()) Debug.logVerbose("Running service: " + serviceName, module); Map<String, Object> result = null; if (context != null) { try { result = dispatcher.runSync(serviceName, context); } catch (GenericServiceException gse) { Debug.logError(gse, "Problems with service invocation.", module); } } return result; }</span>
注意
可以同时配置队列和广播两种模式消息,配置同上并参考ACTIVE MQ官网对JNDI支持
参考
ofbiz jms:https://cwiki.apache.org/confluence/display/OFBIZ/Distributed+Entity+Cache+Clear+(DCC)+Mechanism
activemq jndi:http://activemq.apache.org/jndi-support.html