Oozie 使用ActiveMQ实现 JMS通知

一,介绍

提交给Oozie的作业,作业在运行过程中的状态会发生变化如:执行成功了,或者失败了……Oozie能够监控这些作业状态的改变并且将这些消息发送到JMS消息服务器。这里,使用ActiveMQ作为JMS消息服务器。

Oozie supports publishing notifications to a JMS Provider for job status changes and SLA met and miss events.

For Oozie to send/receive messages, a JMS-compliant broker should be installed. Apache ActiveMQ is a popular JMS-compliant broker usable for this purpose.

二,配置Oozie以允许支持消息服务

需要修改oozie-site.xml并添加若干配置进去,具体可参考官网Notifications Configuration

按照官网给出的步骤进行配置即可。

由于我用的是Cloudera Hadoop,故可以在其管理界面直接进行配置。如下图:

修改Oozie配置之后,需要重启生效。

三,安装ActiveMQ接收消息

参考ActiveMQ官方网站,安装及配置。

启动ActiveMQ,过一段时间后,若有作业提交给Oozie,Oozie执行后会给ActiveMQ发消息。还未弄清楚到底在什么情况下,Oozie会向ActiveMQ发送消息???

如下图:ActiveMQ收到的消息:

topicName 即为userName,可从Oozie的配置文件oozie-default.xml中看出:

4,编写ActiveMQ Client程序从Borker中获取消息

实现javax.jms.MessageListener接口,建立连接代码如下:

        OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie");
        JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();

        Properties jndiProperties = jmsInfo.getJNDIProperties();
        Context jndiContext = new InitialContext(jndiProperties);

        String connectionFactoryName = (String) jndiContext.getEnvironment()
                .get("connectionFactoryNames");
        ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext
                .lookup(connectionFactoryName);
        Connection connection = connectionFactory.createConnection();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

当有新消息到达时,自动调用MessageListener的onMessage()方法获取到消息。

    @Override
    public void onMessage(Message message) {
        try {
            if (message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)
                    .equals(org.apache.oozie.client.event.Event.MessageType.SLA
                            .name())) {
                SLAMessage slaMessage = JMSMessagingUtils
                        .getEventMessage(message);
                String id = slaMessage.getId();
                String noti = slaMessage.getNotificationMessage();
                System.out.println(id + " : " + noti);
                // Further processing
            } else if (message.getStringProperty(JMSHeaderConstants.APP_TYPE)
                    .equals(AppType.WORKFLOW_JOB.name())) {
                WorkflowJobMessage wfJobMessage = JMSMessagingUtils
                        .getEventMessage(message);
                String jobId = wfJobMessage.getId();//获得作业的ID
                Date startTime = wfJobMessage.getStartTime();//获得作业的启动时间
                System.out.println(jobId + startTime);
                // Further processing
            }

5,测试

给Oozie提交workflow作业,在MyEclipse控制台中看到,该作业的ID和启动时间已经查询到了。

6,总结

由于原来当作业提交之后,不断地向Oozie Server发HTTP请求(每隔10秒一次)的方式来查询作业是否执行完成,然后Oozie Server以JSON格式的字符串返回该JobID相关的执行结果,这种情况有两个问题:①同步的HTTP轮询方式,效率低下,耦合严重; ②需要自己解析JSON,处理异常情况(比如,执行失败的作业Json字符串中的EndTime字段为null)

而现在引入ActiveMQ后,异步获取消息,查询作业的执行结果变得更加容易了。而且还可以使用Oozie提供的一些更加高级的特性了。比如:SLA(Service Level Agreement)

时间: 2024-10-09 11:27:07

Oozie 使用ActiveMQ实现 JMS通知的相关文章

ActiveMQ:JMS开源框架入门介绍

介绍基本的JMS概念与开源的JMS框架ActiveMQ应用,内容涵盖一下几点: 基本的JMS概念 JMS的消息模式 介绍ActiveMQ 一个基于ActiveMQ的JMS例子程序 一:JMS基本概念 1. JMS的目标 为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,尽可能最小化的Java语言概念去构建最大化企业消息应用.统一已经存在的企业级消息系统功能. 2. 提供者 JMS提供者是指那些完全完成JMS功能与管理功能的JMS消息厂商,理论上JMS提供者完成.

框架面试题(maven、ZooKeeper、Dubbo、Nginx、Redis、Lucene、Solr、ActiveMQ、JMS

什么是 Maven? Maven 使用项目对象模型(POM)的概念,可以通过一小段描述信息来管理项目的构建, 报告和文档的软件项目管理工具. Maven 除了以程序构建能力为特色之外,还提供高级项目管理工具.由于 Maven 的 缺省构建规则有较高的可重用性,所以常常用两三行 Maven 构建脚本就可以构建简单的项 目.由于 Maven 的面向项目的方法,许多 Apache Jakarta 项目发布时使用 Maven,而 且公司项目采用 Maven 的比例在持续增长. Maven 的出现,解决了

基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送

写了一个简单的JMS例子,之所以使用JNDI 是出于通用性考虑,该例子使用JMS规范提供的通用接口,没有使用具体JMS提供者的接口,这样可以保证我们编写的程序适用于任何一种JMS实现(ActiveMQ.HornetQ...). 什么是JNDI JNDI(Java Naming and Directory Interface)是一个标准规范,类似于JDBC,JMS等规范,为开发人员提供了查找和访问各种命名和目录服务的通用.统一的接口.J2EE 规范要求所有 J2EE 容器都要提供 JNDI 规范的

使用activeMQ实现jms

一:jms介绍         jms说白了就是java message service,是J2EE规范的一部分,跟jdbc差不多,sun只提供了接口,由各个厂商(provider)来进行具体的实现,然后使用者使用他们的jar包进行开发使用即可.        另外在jms的API中,jms传递消息有两种方式,一种是点对点的Queue,还有一个是发布订阅的Topic方式.区别在于:        对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者

Java消息队列ActiveMQ (一)--JMS基本概念

摘要:The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely

AMQP 与ActiveMQ,JMS消息队列之间的比较

http://blog.csdn.net/kimmking/article/details/8253549 http://www.csdn123.com/html/mycsdn20140110/8f/8f42bb0680685c547107a0079e557686.html http://blog.sina.com.cn/s/blog_999d1f4c01010dpx.html

消费者端的Spring JMS 连接ActiveMQ接收生产者Oozie Server发送的Oozie作业执行结果

一,介绍 Oozie是一个Hadoop工作流服务器,接收Client提交的作业(MapReduce作业)请求,并把该作业提交给MapReduce执行.同时,Oozie还可以实现消息通知功能,只要配置好消息服务器,Oozie Server就可以把作业的执行结果发送到消息服务器上,而Client只需要订阅其感兴趣的消息即可.具体的配置参考这篇文章:Oozie 使用ActiveMQ实现 JMS通知 由于Spring内置了JMS相关的服务,因此这里记录在Spring中如何配置消费者连接ActiveMQ,

Oozie 生成JMS消息并向 JMS Provider发送消息过程分析

一,涉及到的工程 从官网下载源码,mvn 编译成 Eclipse工程文件: 对于JMS消息这一块,主要涉及到两个工程: oozie-core工程有问题的原因是还需要一些其他的依赖工程未导入: 二,Oozie 生成 JMS消息 主要涉及到的一些类 oozie-core 工程中的: oozie-client工程中的: 三,相关代码: 对于Oozie Server而言,它是消息的生产者.在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就

Spring jms 与 ActiveMq初识

Spring JMS 与 ActiveMQ初识 1.1 Spring jms 与 ActiveMQ简介 jms 的全称是 Java Message Service,其主要作用是在生产者与消费者之间进行消息的传递:实际业务场景下,当A系统完成某项业务操作后,需要通知B系统或者其他任意系统 A系统操作完成的状态,以及操作中涉及到的相关信息,比如 当会员卡发放系统完成给用户绑定一张会员卡的操作之后,可以发出一条消息,消息内容是 uid或phone为XXX的用户,绑定了一张XX类型(普通卡.贵宾卡等)的