ofbiz jms activemq

最近在研究ofbiz jms,用了activemq玩了一下,ofbiz使用JNDI与其它JMS进行消息接收与发送,总结一下。

准备

ofbiz12.04 版本

activemq 5.5 版本

jar包

在base/lib下引入包

activemq-all-5.5.0.jar

geronimo-j2ee-management_1.1_spec-1.0.1.jar

服务引擎配置

<span style="font-size:14px;"><jms-service name="serviceMessenger" send-mode="all">
            <server jndi-server-name="default"
                    jndi-name="topicConnectionFactory"
                    topic-queue="OFBTopic"
                    type="topic"
                    listen="true"/>
        </jms-service></span>

jndi.properties配置

<span style="font-size:14px;">java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://127.0.0.1:61616
topic.OFBTopic=OFBTopic
connectionFactoryNames=connectionFactory, queueConnectionFactory, topicConnectionFactory</span>

只试一个topic消息模式,如果想用queue队列,配置

<span style="font-size:14px;">queue.OFBQueue= OFBQueue</span>

测试

先启动activemq ,再启动ofbiz

ofbiz启动之后看到:

<span style="font-size:14px;">2015-01-15 11:47:56,970 ([email protected]) [ JmsListenerFactory.java:89 :INFO ] JMS Listener Factory Thread Finished; All listeners connected.</span>

如果报监听失败,自己查查什么问题。

之后再看activemq控制台

消息发送和接收

咱们是用ofbiz给出的测试例子:

services_test.xml

<span style="font-size:14px;"><service name="testJMSTopic" engine="jms" location="serviceMessenger" invoke="testScv">
        <description>Test JMS Topic service</description>
        <attribute name="message" type="String" mode="IN"/>
    </service></span>

运行此服务,看下效果:

<span style="font-size:14px;">2015-01-15 12:09:10,754 (ActiveMQ Session Task-1) [       ModelService.java:469:INFO ] Set default value [999.9999] for parameter [defaultValue]
2015-01-15 12:09:10,755 (ActiveMQ Session Task-1) [     ServiceEcaRule.java:137:INFO ] For Service ECA [testScv] on [invoke] got false for condition: [message][equals][auto][true][String]
---- SVC-CONTEXT: message => 测试TOPIC消息
---- SVC-CONTEXT: locale => zh_CN
---- SVC-CONTEXT: timeZone => sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]
---- SVC-CONTEXT: userLogin => [GenericEntity:UserLogin][createdStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][createdTxStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][currentPassword,{SHA}47ca69ebb4bdc9ae0adec130880165d2cc05db1a(java.lang.String)][enabled,Y(java.lang.String)][hasLoggedOut,N(java.lang.String)][lastTimeZone,Asia/Macao(java.lang.String)][lastUpdatedStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][lastUpdatedTxStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][partyId,admin(java.lang.String)][successiveFailedLogins,0(java.lang.Long)][userLoginId,admin(java.lang.String)]
---- SVC-CONTEXT: defaultValue => 999.9999
-----SERVICE TEST----- : 测试TOPIC消息
----- SVC: entity-default -----</span>

看下activemq控制台

入列消息一条,接收消息一条。

说明一下为什么会这样

我们运行服务发送了一条广播消息,这就是为什么Messages Enqueued有一条数据,那么为什么Messages Dequeued也有一条消息,因为ofbiz是服务端同时也是客户端,我们设置了监听不是。。

ofbiz作为服务端

我们只要看一下JMS的服务引擎就明白了,不多解释

ofbiz作为客户端

这块是怎么玩呢?

首先我们在服务引擎配置文件设置了监听为true,

其次,我们看下在加载服务引擎之后,在根据服务引擎配置文件加载JmsListenerFactory.java去加载JMS的监听。

最后,贴出来一段代码

JmsTopicListener.java

<span style="font-size:14px;"> public synchronized void load() throws GenericServiceException {
        try {
            InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer);
            TopicConnectionFactory factory = (TopicConnectionFactory) jndi.lookup(jndiName);

            if (factory != null) {
                con = factory.createTopicConnection(userName, password);
                con.setExceptionListener(this);
                session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
                topic = (Topic) jndi.lookup(topicName);
                if (topic != null) {
                    TopicSubscriber subscriber = session.createSubscriber(topic);
                    subscriber.setMessageListener(this);
                    con.start();
                    this.setConnected(true);
                    if (Debug.infoOn()) Debug.logInfo("Listening to topic [" + topicName + "] on [" + jndiServer + "]...", module);
                } else {
                    throw new GenericServiceException("Topic lookup failed.");
                }
            } else {
                throw new GenericServiceException("Factory (broker) lookup failed.");
            }
        } catch (NamingException ne) {
            throw new GenericServiceException("JNDI lookup problems; listener not running.", ne);
        } catch (JMSException je) {
            throw new GenericServiceException("JMS internal error; listener not running.", je);
        } catch (GeneralException ge) {
            throw new GenericServiceException("Problems with InitialContext; listener not running.", ge);
        }
    }</span>

当时,还有一个疑问,那有监听之后ofbiz做了什么事情去处理监听到消息事件之后的事情?

同样,贴出一段代码,大家就都明白了:

AbstractJmsListener.java

<span style="font-size:14px;">/**
     * Receives the MapMessage and processes the service.
     * @see javax.jms.MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        MapMessage mapMessage = null;

        if (Debug.verboseOn()) Debug.logVerbose("JMS Message Received --> " + message, module);

        if (message instanceof MapMessage) {
            mapMessage = (MapMessage) message;
        } else {
            Debug.logError("Received message is not a MapMessage!", module);
            return;
        }
        runService(mapMessage);
    }</span>
<span style="font-size:14px;">/**
     * Runs the service defined in the MapMessage
     * @param message
     * @return Map
     */
    protected Map<String, Object> runService(MapMessage message) {
        Map<String, ? extends Object> context = null;
        String serviceName = null;
        String xmlContext = null;

        try {
            serviceName = message.getString("serviceName");
            xmlContext = message.getString("serviceContext");
            if (serviceName == null || xmlContext == null) {
                Debug.logError("Message received is not an OFB service message. Ignored!", module);
                return null;
            }

            Object o = XmlSerializer.deserialize(xmlContext, dispatcher.getDelegator());

            if (Debug.verboseOn()) Debug.logVerbose("De-Serialized Context --> " + o, module);
            if (ObjectType.instanceOf(o, "java.util.Map"))
                context = UtilGenerics.checkMap(o);
        } catch (JMSException je) {
            Debug.logError(je, "Problems reading message.", module);
        } catch (Exception e) {
            Debug.logError(e, "Problems deserializing the service context.", module);
        }

        try {
            ModelService model = dispatcher.getDispatchContext().getModelService(serviceName);
            if (!model.export) {
                Debug.logWarning("Attempt to invoke a non-exported service: " + serviceName, module);
                return null;
            }
        } catch (GenericServiceException e) {
            Debug.logError(e, "Unable to get ModelService for service : " + serviceName, module);
        }

        if (Debug.verboseOn()) Debug.logVerbose("Running service: " + serviceName, module);

        Map<String, Object> result = null;
        if (context != null) {
            try {
                result = dispatcher.runSync(serviceName, context);
            } catch (GenericServiceException gse) {
                Debug.logError(gse, "Problems with service invocation.", module);
            }
        }
        return result;
    }</span>

注意

可以同时配置队列和广播两种模式消息,配置同上并参考ACTIVE MQ官网对JNDI支持

参考

ofbiz  jms:https://cwiki.apache.org/confluence/display/OFBIZ/Distributed+Entity+Cache+Clear+(DCC)+Mechanism

activemq jndi:http://activemq.apache.org/jndi-support.html

时间: 2024-11-09 01:01:22

ofbiz jms activemq的相关文章

spring boot整合JMS(ActiveMQ实现)

一.安装ActiveMQ 具体的安装步骤,请参考我的另一篇博文: http://blog.csdn.net/liuchuanhong1/article/details/52057711 二.新建spring boot工程,并加入JMS(ActiveMQ)依赖 三.工程结构 pom依赖如下: [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

JMS - ActiveMQ集成Spring

下面是ActiveMQ官网提供的文档.http://activemq.apache.org/spring-support.html 下面是我添加的一些dependency: <!-- jms activemq --> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0</versio

Spring整合ActiveMQ:spring+JMS+ActiveMQ+Tomcat

一.目录结构 相关jar包 二.关键配置activmq.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi=&quo

JMS ActiveMQ案例

创建一个web工程 导入ActiveMQ依赖的jar包  activemq-all-5.9.jar 写一个生产者(send)servlet package com.sun.jms;import java.io.IOException;import java.io.PrintWriter; import javax.jms.DeliveryMode;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.Qu

JMS - ActiveMQ的简单使用

首先需要下载ActiveMQ,下面的链接给我们列出了所有版本:http://activemq.apache.org/download-archives.html每个版本为不同的OS提供了链接: 公司电脑是windows的,用目录下的activemq.bat启动: 端口号默认是61616,可以在conf/activemq.xml中看到: <transportConnectors> <!-- DOS protection, limit concurrent connections to 10

JMS ActiveMQ研究文档

1. 背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行:(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进程 都必须正常运行:如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常:(3)点对点通信:客户的一次调用只发送给某个单独的目标对象. 面向消息的中间件(Message Oriente

Spring JMS ActiveMQ整合(转)

转载自:http://my.oschina.net/xiaoxishan/blog/381209#comment-list ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中 记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

首先声明:以下内容均是在网上找别人的博客综合学习而成的,可能会发现某些代码与其他博主的相同,由于参考的文章比较多,这里对你们表示感谢,就不一一列举,如果有侵权的地方,请通知我,我可以把该文章删除. 1.jms-xml Spring配置文件 [html] view plain copy print? <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springf

JMS Activemq实战例子demo

上一篇已经讲了JMS的基本概念,这一篇来上手练一练,如果对JMS基本概念还不熟悉,欢迎参靠JMS基本概. 这篇文章所使用的代码已经不是我刚入手时的代码,已经经过我重构过的代码,便于理解,并且加了很多中文注释,希望对大家有所帮助. 在基本概念一篇中已经讲到,JMS有两种消息模型,一种是点对点,另一种的发布/订阅模式.本篇文章就基于这两种消息模型来写例子. 点对点模型 先看一下生产者代码: [java] view plain copy package com.darren.activemq.queu