一,介绍
Oozie是一个Hadoop工作流服务器,接收Client提交的作业(MapReduce作业)请求,并把该作业提交给MapReduce执行。同时,Oozie还可以实现消息通知功能,只要配置好消息服务器,Oozie Server就可以把作业的执行结果发送到消息服务器上,而Client只需要订阅其感兴趣的消息即可。具体的配置参考这篇文章:Oozie 使用ActiveMQ实现 JMS通知
由于Spring内置了JMS相关的服务,因此这里记录在Spring中如何配置消费者连接ActiveMQ,从而接收生产者Oozie发送的消息。
二,Oozie Server作为生产者的相关配置
这主要在这篇文章Oozie 使用ActiveMQ实现 JMS通知 已经提到了。
其中Oozie的配置文件 oozie-default.xml中相关配置如下:
<property> <name>oozie.jms.producer.connection.properties</name> <value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://l ocalhost:61616;connectionFactoryNames#ConnectionFactory</value> </property> <!-- JMSAccessorService --> <property> <name>oozie.service.JMSAccessorService.connectioncontext.impl</name> <value> org.apache.oozie.jms.DefaultConnectionContext </value> <description> Specifies the Connection Context implementation </description> </property>
Destination的相关配置如下,这里的Destination是一个Topic,即生产者发送消息的目的地,也是消费者取消息的地方。
<property> <name>oozie.service.JMSTopicService.topic.name</name> <value> default=${username} </value> <description> Topic options are ${username} or ${jobId} or a fixed string which can be specified as default or for a particular job type. For e.g To have a fixed string topic for workflows, coordinators and bundles, specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2} where job type can be WORKFLOW, COORDINATOR or BUNDLE. e.g. Following defines topic for workflow job, workflow action, coordinator job, coordinator action, bundle job and bundle action WORKFLOW=workflow, COORDINATOR=coordinator, BUNDLE=bundle For jobs with no defined topic, default topic will be ${username} </description> </property>
三,在Spring中配置消费者的连接信息
这里采用JNDI连接ActiveMQ,连接信息配置如下:
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate"> <property name="environment"> <props> <prop key="java.naming.factory.initial"> org.apache.activemq.jndi.ActiveMQInitialContextFactory </prop> <prop key="java.naming.provider.url"> tcp://192.168.121.35:61616 </prop> <prop key="java.naming.security.principal"> system </prop> <prop key="java.naming.security.credentials"> manager </prop> </props> </property> </bean>
配置连接工厂:
<bean id="jndiTopicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiTemplate" ref="jndiTemplate" /> <property name="jndiName" value="ConnectionFactory" /> </bean>
我是怎么知道连接工厂的value="ConnectionFactory"的呢?由于我大部分采用的是Oozie的默认配置,根据Oozie官网提供的一个示例程序,调试出的Oozie使用的连接工厂的。
//获得Oozie中关于JMS的相关配置信息,如Transport Connectors OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie"); JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo(); Properties jndiProperties = jmsInfo.getJNDIProperties(); Context jndiContext = new InitialContext(jndiProperties);
这段代码建立到ActiveMQ的连接上下文,调试上述代码可以看到下面的一些信息:
{java.naming.provider.url=tcp://192.168.121.35:61616, java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory, connectionFactoryNames=ConnectionFactory}
配置Topic
<bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="cdhfive"></constructor-arg> </bean>
Topic就是Destination啊。由于从oozie-default.xml中得到生产者的Topic为 ${username},而我们这里的用户名为cdhfive ,故Topic的配置如上。
配置监听器
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="topicConnectionFactory"></property> <property name="destinationResolver" ref="destinationResolver"></property> <property name="concurrentConsumers" value="2"></property> <property name="destination" ref="notifyTopic"></property> <property name="messageListener" ref="messageListener"></property> </bean>
①是监听器需要监听的Destination,即我们上面配置的Topic。②是监听器的实现bean,该bean implements javax.jms.MessageListener接口
<bean id="messageListener" class="com.schedule.tools.message.SimpleJMSReceiver" />
至此,大部分的配置已经完成了。
四,实现监听器MessageListener接口,接收消息
当有消息推送给订阅者时,javax.jms.MessageListener接口的onMessage()方法被自动调用,就可以在该方法中处理收到的消息了。
@Override public void onMessage(Message message) { String parentJobId = null; String jobId = null; String errorMessage = null; String status = null; Date startTime = null; Date endTime = null; long runTime = -1;//-1 means job run error try { // 普通用户作业和解释作业 if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals( AppType.WORKFLOW_JOB.name())) { WorkflowJobMessage wfJobMessage = JMSMessagingUtils .getEventMessage(message); // 是普通作业 jobId = wfJobMessage.getId(); errorMessage = wfJobMessage.getErrorMessage(); status = wfJobMessage.getStatus().toString(); startTime = wfJobMessage.getStartTime(); endTime = wfJobMessage.getEndTime(); if(endTime != null){ runTime = endTime.getTime() - startTime.getTime(); System.out.println(jobId + "执行了:" + (endTime.getTime()-startTime.getTime())/1000 + "s"); } //other code.....
五,参考资料
《JAVA消息服务》电子工业出版社
https://oozie.apache.org/docs/4.0.0/DG_JMSNotifications.html