一,介绍
提交给Oozie的作业,作业在运行过程中的状态会发生变化如:执行成功了,或者失败了……Oozie能够监控这些作业状态的改变并且将这些消息发送到JMS消息服务器。这里,使用ActiveMQ作为JMS消息服务器。
Oozie supports publishing notifications to a JMS Provider for job status changes and SLA met and miss events.
For Oozie to send/receive messages, a JMS-compliant broker should be installed. Apache ActiveMQ is a popular JMS-compliant broker usable for this purpose.
二,配置Oozie以允许支持消息服务
需要修改oozie-site.xml并添加若干配置进去,具体可参考官网Notifications Configuration
按照官网给出的步骤进行配置即可。
由于我用的是Cloudera Hadoop,故可以在其管理界面直接进行配置。如下图:
修改Oozie配置之后,需要重启生效。
三,安装ActiveMQ接收消息
参考ActiveMQ官方网站,安装及配置。
启动ActiveMQ,过一段时间后,若有作业提交给Oozie,Oozie执行后会给ActiveMQ发消息。还未弄清楚到底在什么情况下,Oozie会向ActiveMQ发送消息???
如下图:ActiveMQ收到的消息:
topicName 即为userName,可从Oozie的配置文件oozie-default.xml中看出:
4,编写ActiveMQ Client程序从Borker中获取消息
实现javax.jms.MessageListener接口,建立连接代码如下:
OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie"); JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo(); Properties jndiProperties = jmsInfo.getJNDIProperties(); Context jndiContext = new InitialContext(jndiProperties); String connectionFactoryName = (String) jndiContext.getEnvironment() .get("connectionFactoryNames"); ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext .lookup(connectionFactoryName); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
当有新消息到达时,自动调用MessageListener的onMessage()方法获取到消息。
@Override public void onMessage(Message message) { try { if (message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE) .equals(org.apache.oozie.client.event.Event.MessageType.SLA .name())) { SLAMessage slaMessage = JMSMessagingUtils .getEventMessage(message); String id = slaMessage.getId(); String noti = slaMessage.getNotificationMessage(); System.out.println(id + " : " + noti); // Further processing } else if (message.getStringProperty(JMSHeaderConstants.APP_TYPE) .equals(AppType.WORKFLOW_JOB.name())) { WorkflowJobMessage wfJobMessage = JMSMessagingUtils .getEventMessage(message); String jobId = wfJobMessage.getId();//获得作业的ID Date startTime = wfJobMessage.getStartTime();//获得作业的启动时间 System.out.println(jobId + startTime); // Further processing }
5,测试
给Oozie提交workflow作业,在MyEclipse控制台中看到,该作业的ID和启动时间已经查询到了。
6,总结
由于原来当作业提交之后,不断地向Oozie Server发HTTP请求(每隔10秒一次)的方式来查询作业是否执行完成,然后Oozie Server以JSON格式的字符串返回该JobID相关的执行结果,这种情况有两个问题:①同步的HTTP轮询方式,效率低下,耦合严重; ②需要自己解析JSON,处理异常情况(比如,执行失败的作业Json字符串中的EndTime字段为null)
而现在引入ActiveMQ后,异步获取消息,查询作业的执行结果变得更加容易了。而且还可以使用Oozie提供的一些更加高级的特性了。比如:SLA(Service Level Agreement)