消费者端的Spring JMS 连接ActiveMQ接收生产者Oozie Server发送的Oozie作业执行结果

一,介绍

Oozie是一个Hadoop工作流服务器,接收Client提交的作业(MapReduce作业)请求,并把该作业提交给MapReduce执行。同时,Oozie还可以实现消息通知功能,只要配置好消息服务器,Oozie Server就可以把作业的执行结果发送到消息服务器上,而Client只需要订阅其感兴趣的消息即可。具体的配置参考这篇文章:Oozie 使用ActiveMQ实现 JMS通知

由于Spring内置了JMS相关的服务,因此这里记录在Spring中如何配置消费者连接ActiveMQ,从而接收生产者Oozie发送的消息。

二,Oozie Server作为生产者的相关配置

这主要在这篇文章Oozie 使用ActiveMQ实现 JMS通知 已经提到了。

其中Oozie的配置文件 oozie-default.xml中相关配置如下:

 <property>
        <name>oozie.jms.producer.connection.properties</name>
        <value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://l
ocalhost:61616;connectionFactoryNames#ConnectionFactory</value>
 </property>

<!-- JMSAccessorService -->    <property>        <name>oozie.service.JMSAccessorService.connectioncontext.impl</name>        <value>        org.apache.oozie.jms.DefaultConnectionContext        </value>        <description>        Specifies the Connection Context implementation        </description>    </property>

Destination的相关配置如下,这里的Destination是一个Topic,即生产者发送消息的目的地,也是消费者取消息的地方。

  <property>
        <name>oozie.service.JMSTopicService.topic.name</name>
        <value>
        default=${username}
        </value>
        <description>
        Topic options are ${username} or ${jobId} or a fixed string which can be specified as default or for a
        particular job type.
        For e.g To have a fixed string topic for workflows, coordinators and bundles,
        specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
        where job type can be WORKFLOW, COORDINATOR or BUNDLE.
        e.g. Following defines topic for workflow job, workflow action, coordinator job, coordinator action,
        bundle job and bundle action
        WORKFLOW=workflow,
        COORDINATOR=coordinator,
        BUNDLE=bundle
        For jobs with no defined topic, default topic will be ${username}
        </description>
    </property>

三,在Spring中配置消费者的连接信息

这里采用JNDI连接ActiveMQ,连接信息配置如下:

<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
            <props>
                <prop key="java.naming.factory.initial">
                    org.apache.activemq.jndi.ActiveMQInitialContextFactory
                </prop>
                <prop key="java.naming.provider.url">
                    tcp://192.168.121.35:61616
                </prop>
                <prop key="java.naming.security.principal">
                    system
                </prop>
                <prop key="java.naming.security.credentials">
                    manager
                </prop>
            </props>
        </property>
    </bean>

配置连接工厂:

    <bean id="jndiTopicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiTemplate" ref="jndiTemplate" />
        <property name="jndiName" value="ConnectionFactory" />
    </bean>

我是怎么知道连接工厂的value="ConnectionFactory"的呢?由于我大部分采用的是Oozie的默认配置,根据Oozie官网提供的一个示例程序,调试出的Oozie使用的连接工厂的。

//获得Oozie中关于JMS的相关配置信息,如Transport Connectors
        OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie");
        JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();

        Properties jndiProperties = jmsInfo.getJNDIProperties();
        Context jndiContext = new InitialContext(jndiProperties);

这段代码建立到ActiveMQ的连接上下文,调试上述代码可以看到下面的一些信息:

{java.naming.provider.url=tcp://192.168.121.35:61616,
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory,
connectionFactoryNames=ConnectionFactory}

配置Topic

    <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="cdhfive"></constructor-arg>
    </bean>

Topic就是Destination啊。由于从oozie-default.xml中得到生产者的Topic为 ${username},而我们这里的用户名为cdhfive ,故Topic的配置如上。

配置监听器

<bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="topicConnectionFactory"></property>
        <property name="destinationResolver" ref="destinationResolver"></property>
        <property name="concurrentConsumers" value="2"></property>
        <property name="destination" ref="notifyTopic"></property>
        <property name="messageListener" ref="messageListener"></property>
    </bean>

①是监听器需要监听的Destination,即我们上面配置的Topic。②是监听器的实现bean,该bean implements javax.jms.MessageListener接口

    <bean id="messageListener" class="com.schedule.tools.message.SimpleJMSReceiver" />

至此,大部分的配置已经完成了。

四,实现监听器MessageListener接口,接收消息

当有消息推送给订阅者时,javax.jms.MessageListener接口的onMessage()方法被自动调用,就可以在该方法中处理收到的消息了。

@Override
    public void onMessage(Message message) {
        String parentJobId = null;
        String jobId = null;
        String errorMessage = null;
        String status = null;
        Date startTime = null;
        Date endTime = null;
        long runTime = -1;//-1 means job run error

        try {
            // 普通用户作业和解释作业

            if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(
                    AppType.WORKFLOW_JOB.name())) {

                WorkflowJobMessage wfJobMessage = JMSMessagingUtils
                        .getEventMessage(message);

                // 是普通作业
                jobId = wfJobMessage.getId();
                errorMessage = wfJobMessage.getErrorMessage();
                status = wfJobMessage.getStatus().toString();
                startTime = wfJobMessage.getStartTime();
                endTime = wfJobMessage.getEndTime();

                if(endTime != null){
                    runTime = endTime.getTime() - startTime.getTime();
                    System.out.println(jobId + "执行了:" + (endTime.getTime()-startTime.getTime())/1000 + "s");
                }

       //other code.....

五,参考资料

《JAVA消息服务》电子工业出版社

https://oozie.apache.org/docs/4.0.0/DG_JMSNotifications.html

时间: 2024-07-29 23:52:20

消费者端的Spring JMS 连接ActiveMQ接收生产者Oozie Server发送的Oozie作业执行结果的相关文章

Spring jms 与 ActiveMq初识

Spring JMS 与 ActiveMQ初识 1.1 Spring jms 与 ActiveMQ简介 jms 的全称是 Java Message Service,其主要作用是在生产者与消费者之间进行消息的传递:实际业务场景下,当A系统完成某项业务操作后,需要通知B系统或者其他任意系统 A系统操作完成的状态,以及操作中涉及到的相关信息,比如 当会员卡发放系统完成给用户绑定一张会员卡的操作之后,可以发出一条消息,消息内容是 uid或phone为XXX的用户,绑定了一张XX类型(普通卡.贵宾卡等)的

spring jms结合activemq

一.下载activemq 这个例子使用的版本是5.9.0 链接:http://download.csdn.net/download/hpw90333/6652367 先下载了5.14.2运行后出错: javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:61616 Transport Connection to: tcp://127.0.0.1:50941 failed: java.io.

Spring Boot连接MySQL报错“Internal Server Error”的解决办法

报错信息如下: {timestamp: "2018-06-14T03:48:23.436+0000", status: 500, error: "Internal Server Error",-} error : "Internal Server Error" message : "Could not open JDBC Connection for transaction; nested exception is java.sql.S

Spring 实现远程访问详解——jms和activemq

前几章我们分别利用spring rmi.httpinvoker.httpclient.webservice技术实现不同服务器间的远程访问.本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问. 一.  简介 1.      什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器.同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀

spring boot整合activeMQ

spring boot整合activeMQ spring boot整合MQ以后,对于消息的发送和接收操作更加便捷.本文将通过四个案例,分别讲解spring boot整合MQ: spring boot整合MQ发送queue消息 spring boot整合MQ发送topic消息 spring boot整合MQ以后如何让queue和topic消息共存 spring boot整合MQ以后topic消息如何持久化 下面分别进行讲解: 一. spring boot 整合MQ发送queue消息 搭建测试工程,

【Active入门-2】ActiveMQ学习-生产者与消费者

1个生产者,1个消费者,使用Queue: 方式1: 生产者将消息发送到Queue中,退出: 然后运行消费者: . 可以看到,可以接收到消息. 方式2: 先运行消费者程序: 然后运行生产者: 消费者见下图: 1个生产者,2个消费者,使用Queue 先运行消费者1: 在运行消费者2: 接下来运行生产者: 下面是消费者消费情况: : 总结: 1. 使用Queue时,生产者只要将Message发送到MQ服务器端,消费者就可以进行消费,而无需生产者程序一直运行: 2. 消息是按照先入先出的顺序,一旦有消费

JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合

[TOC] 缘由: 最近在用netty开发游戏服务器,目前有这样的一个场景,聊天服务器和逻辑服务器要进行消息交互,比如,某个玩家往某个公会提交了加入申请,这个申请动作是在逻辑服务器上完成的,但是要产生一条申请消息,由聊天服务器推送到对应的公会频道,目前这个申请消息就是通过jms发送到聊天服务器上,聊天服务器监听到后,推送到对应的公会频道. 下面主要介绍以下几点 - JMS简介 - 消息传递模型 - ActiveMQ介绍 - 安装使用 - spring整合JMS - 代码相关 JMS简介 J Ja

Spring整合ActiveMQ:spring+JMS+ActiveMQ+Tomcat

一.目录结构 相关jar包 二.关键配置activmq.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi=&quo

Spring JMS ActiveMQ整合(转)

转载自:http://my.oschina.net/xiaoxishan/blog/381209#comment-list ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中 记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试