Oozie 生成JMS消息并向 JMS Provider发送消息过程分析

一,涉及到的工程

从官网下载源码,mvn 编译成 Eclipse工程文件:

对于JMS消息这一块,主要涉及到两个工程:

oozie-core工程有问题的原因是还需要一些其他的依赖工程未导入:

二,Oozie 生成 JMS消息 主要涉及到的一些类

oozie-core 工程中的:

oozie-client工程中的:

三,相关代码:

对于Oozie Server而言,它是消息的生产者。在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就使用这些配置进行连接,产生消息,发送消息。

JMSAccessorService.java

/**
 * This class will <ul>
 * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
 * <li> Provide a way to create a subscriber and publisher </li>
 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
 * </ul>
 */
public class JMSAccessorService implements Service {

直接看注释就知道这个类的功能了。

    /**
     * Map of JMS connection info to established JMS Connection
     */
    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap =
            new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
    /**
     * Map of JMS connection info to topic names to MessageReceiver
     */
    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap =
            new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();

ConcurrentHashMap线程安全的,用来保存与JMS Provider的连接信息

synchronized (this) {
                if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
                    try {
                        jmsProducerConnContext = getConnectionContextImpl();
                        jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
                        jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
  private ConnectionContext getConnectionContextImpl() {
        Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
        ConnectionContext connCtx = null;
        if (defaultClazz == DefaultConnectionContext.class) {
            connCtx = new DefaultConnectionContext();
        }
        else {
            connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null);
        }
        return connCtx;
    }

创建 Producer 连接的上下文环境

DefaultConnectionContext.java  默认的连接上下文环境

public class DefaultConnectionContext implements ConnectionContext {

    protected Connection connection;
    protected String connectionFactoryName;
    private static XLog LOG = XLog.getLog(ConnectionContext.class);

    @Override
    public void createConnection(Properties props) throws NamingException, JMSException {
        Context jndiContext = new InitialContext(props);
        connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
        if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
            connectionFactoryName = "ConnectionFactory";
        }
        ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
        LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
        try {
            connection = connectionFactory.createConnection();
            connection.start();

创建生产者的方法:

    @Override
    public MessageProducer createProducer(Session session, String topicName) throws JMSException {
        Topic topic = session.createTopic(topicName);
        MessageProducer producer = session.createProducer(topic);
        return producer;
    }

它由org.apache.oozie.jms.JMSJobEventListener类中的 sendMessage()调用。

Oozie 配置中关于JMSAccessorService的配置如下:

再来看看:JMSTopicService.java

    static {
        ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
        ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
    }
    public static enum TopicType {
        USER("${username}"), JOBID("${jobId}");

        private String value;

        TopicType(String value) {
            this.value = value;
        }

        String getValue() {
            return value;
        }

    }

可用的Topic名称有 ${username},也可以用jobId作为Topic名称,再看Oozie官方文档解释:

The topic is obtained by concatenating topic prefix and the substituted value for topic pattern. The topic pattern can be a constant value like workflow or coordinator which the administrator has configured or ${username}.

The getJMSTopicName API can be used if the job id is already known and will give the exact topic name to which the notifications for that job are published.

 private void parseTopicConfiguration() throws ServiceException {
        String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value);
        if (topicName == null) {
            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
        }

Topic默认是${username}

发送消息的实现类JMSJobEventListener.java  根据相应的作业事件发送作业的执行结果

/**
 * Class to send JMS notifications related to job events.
 *
 */
public class JMSJobEventListener extends JobEventListener {
    private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
    private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
    private JMSConnectionInfo connInfo;
    public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties";
    public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts";
    public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode";
    public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date";

连接方式、发送消息后是否自动回复、消息的生命周期,持久消息还是非持久消息...

    public void init(Configuration conf) {
        LOG = XLog.getLog(getClass());
        String jmsProps = conf.get(JMS_CONNECTION_PROPERTIES);
        LOG.info("JMS producer connection properties [{0}]", jmsProps);
        connInfo = new JMSConnectionInfo(jmsProps);
        jmsSessionOpts = conf.getInt(JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
        jmsDeliveryMode = conf.getInt(JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT);
        jmsExpirationDate = conf.getInt(JMS_EXPIRATION_DATE, 0);

    }

发送消息的过程:

1)EventHandlerService ,里面有个内部类EventWoker线程,当有相应的作业事件发生时,Listener被触发

**
 * Service class that handles the events system - creating events queue,
 * managing configured properties and managing and invoking various event
 * listeners via worker threads
 */
public class EventHandlerService implements Service {

//.....

public class EventWorker implements Runnable {

        @Override        public void run() {//.....other code        while (iter.hasNext()) {              try {                     if (msgType == MessageType.JOB) {                            invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);                        }

 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {            switch (event.getAppType()) {                case WORKFLOW_JOB:                    jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);

相应的作业监听器被触发后,创建相应的作业,获得待发送的地址Topic,并序列化消息

    @Override
    public void onWorkflowJobEvent(WorkflowJobEvent event) {
        WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event);
        serializeJMSMessage(wfJobMessage, getTopic(event));
    }

序列化后,调用send进行发送

    private void serializeJMSMessage(JobMessage jobMessage, String topicName) {
        MessageSerializer serializer = MessageFactory.getMessageSerializer();
        String messageBody = serializer.getSerializedObject(jobMessage);
        sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat());
    }

创建连接上下文、创建会话、创建消息、设置消息的属性、创建生产者、设置传送模式和消息的生命周期、然后send消息。

    protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName,
            String messageFormat) {
        jmsContext = jmsService.createProducerConnectionContext(connInfo);
        if (jmsContext != null) {
            try {
                Session session = jmsContext.createThreadLocalSession(jmsSessionOpts);
                TextMessage textMessage = session.createTextMessage(messageBody);
                for (Map.Entry<String, String> property : messageProperties.entrySet()) {
                    textMessage.setStringProperty(property.getKey(), property.getValue());
                }
                textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
                LOG.trace("Event related JMS text body [{0}]", textMessage.getText());
                LOG.trace("Event related JMS entire message [{0}]", textMessage.toString());
                MessageProducer producer = jmsContext.createProducer(session, topicName);
                producer.setDeliveryMode(jmsDeliveryMode);
                producer.setTimeToLive(jmsExpirationDate);
                producer.send(textMessage);
                producer.close();
            }

WorkflowJobMessage.java展示了一条Workflow消息长什么样:

    /**
     * Constructor for a workflow job message
     * @param eventStatus event status
     * @param workflowJobId the workflow job id
     * @param coordinatorActionId the parent coordinator action id
     * @param startTime start time of workflow
     * @param endTime end time of workflow
     * @param status status of workflow
     * @param user the user
     * @param appName appName of workflow
     * @param errorCode errorCode of the failed wf actions
     * @param errorMessage errorMessage of the failed wf action
     */
    public WorkflowJobMessage(EventStatus eventStatus, String workflowJobId,
            String coordinatorActionId, Date startTime, Date endTime, WorkflowJob.Status status, String user,
            String appName, String errorCode, String errorMessage) {
        super(eventStatus, AppType.WORKFLOW_JOB, workflowJobId, coordinatorActionId, startTime,
                endTime, user, appName);
        this.status = status;
        this.errorCode = errorCode;
        this.errorMessage = errorMessage;
    }

当提交的是Workflow Job,就会生成Workflow消息。

它有一个属性: @param coordinatorActionId the parent coordinator action id  (Coordinator Job里面的Action是Workflow Job)

看完了JMS消息体,再来看看消息头:

/**
 *
 * Class holding constants used in JMS selectors
 */
public final class JMSHeaderConstants {
    // JMS Application specific properties for selectors
    public static final String EVENT_STATUS = "eventStatus";
    public static final String SLA_STATUS = "slaStatus";
    public static final String APP_NAME = "appName";
    public static final String USER = "user";
    public static final String MESSAGE_TYPE = "msgType";
    public static final String APP_TYPE = "appType";

    public static final String JOBID = "jobId";// add for my specific selectors
    // JMS Header property
    public static final String MESSAGE_FORMAT = "msgFormat";
}

消息头里面的属性主要用来过滤。根据消息头里面的字段,使用JMS消息选择器对消息进行过滤。关于根据JobId进行过滤,可参考:Oozie JMS通知消息实现--根据作业ID来过滤消息

不知道有没有bug????

/**
 * Message deserializer to convert from JSON to java object
 */
public class JSONMessageDeserializer extends MessageDeserializer {

    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.

    static {
        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

消息的序列化机制,用的是jackson-mapper-asl jar包。因为消息要从生产者发给消息服务器,就需要序列化了。

有序列化就有反序列化:

/**
 * Class to deserialize the jms message to java object
 */
public abstract class MessageDeserializer {

    /**
     * Constructs the event message from JMS message
     *
     * @param message the JMS message
     * @return EventMessage
     * @throws JMSException
     */
    @SuppressWarnings("unchecked")
    public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
        TextMessage textMessage = (TextMessage) message;
        String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
        String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);

先根据消息的属性解析出消息的类型。

        if (MessageType.valueOf(msgType) == MessageType.JOB) {
            switch (AppType.valueOf(appTypeString)) {
                case WORKFLOW_JOB:
                    WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
                    wfJobMsg.setProperties(textMessage);
                    eventMsg = (T) wfJobMsg;

再根据类型来构造对象。

时间: 2024-11-05 13:31:48

Oozie 生成JMS消息并向 JMS Provider发送消息过程分析的相关文章

消息队列入门(二)消息队列的开源实现

消息队列入门(二)消息队列的开源实现 关于AMQP AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口.目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,

微信企业号回调模式验证与发送消息

最近放假闲着无聊,研究了一下微信企业号, 打算通过企业号做一个运维报警信息发送的功能,记录自己的操作 第一步 注册企业号,网上一搜一大把的教程,这里略过  微信企业号登录地址  https://qy.weixin.qq.com/ 第二步  登录后 点左侧 应用中心 -新建应用 第三步  在第二步第一图中的自建应用下面找到刚刚新建的应用 拉到最下面有一个模式选择,点击回调模式 会看到下图界面 Token 和EncodingAESKey 点击随机获取即可,上面的url需要你有自己的服务地址  你的服

新浪微博发送消息和授权机制原理(WeiboSDK)

1.首先是在微博发送消息,对于刚开始做weibo发送消息的初学者会有一个误区,那就是会认为需要授权后才可以发送消息,其实发送消息只需要几行代码就可以实现了,非常简单,不需要先授权再发送消息,因为weibosdk已经帮我们封装好了.(此情况需要用户安装客户端) 发送消息流程为:点击发送消息按键----SDK会自动帮我们判断用户是否安装了新浪微博客户端--如果未安装弹出安装提示----如果安装直接跳转到sina微博客户端进行发送----发送成功后自动跳回原应用程序. 1)在AppDelegate中注

Java使用多线程发送消息

在后台管理用户信息的时候,经常会用到批量发送提醒消息,首先想到的有: (1).循环发送列表,逐条发送.优点是:简单,如果发送列表很少,而且没有什么耗时的操作,是比较好的一种选择,缺点是:针对大批量的发送列表,不可取,耗时,程序会出现严重的阻塞问题. (2).使用队列(BlockingQueue),开启多个线程,分为三个部分.一部分负责处理将发送列表放入队列:一部分负责从队列中读取并发送消息:第三部分负责监视队列是否为空及后续的操作. (3).以下说到的这种模式,使用Future.Callable

个人微信公众号搭建Python实现 -接收和发送消息-基本说明与实现(14.2.1)

目录 1.原理 2.接收普通消息 3.接收代码普通消息代码实现 @(接收和发送消息-基本说明与实现) 1.原理 2.接收普通消息 其他消息类似参考官方文档 3.接收代码普通消息代码实现 from flask import Flask,request,abort import xmltodict import time app = Flask(__name__) #常量 微信的token令牌 WECHAT_TOKEN = "xxxx" @app.route("/wx"

JMS消息服务器(二)——点对点消息传送模型

一.点对点模型概览 当你只需要将消息发布送给唯一的一个消息消费者是,就应该使用点对点模型.虽然可能或有多个消费者在队列中侦听统一消息,但是,只有一个且仅有一个消费者线程会接受到该消息. 在p2p模型中,生产者称为发送者,而消费者则称为接受者.点对点模型最重要的特性如下: 消息通过称为队列的一个虚拟通道来进行交换.队列是生产者发送消息的目的地和接受者消费消息的消息源. 每条消息通仅会传送给一个接受者.可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费. 消息存在先

MQ消息队列(2)—— Java消息服务接口(JMS)

一.理解JMS   1.什么是JMS?         JMS即Java消息服务(Java Message Service)应用程序接口,API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合. JMS不是消息队列,更不是某种消息队列协议.JMS是Jav

【AMQ】之JMS Mesage structure(JMS消息结构)

Δ消息体:JMS API 定义了5种消息格式也叫消息类型,可以使用不同形式发送和接收数据,并可以兼容现有的消息格式 TextMessage,MapMessage,ByteMessage,StreamMessage,Objectmessage Δ消息属性: 1.应用程序设置和添加的属性,比如: Message.setStringProperty("name","张三"); 2.JMS定义属性的名字 Enumeration names = connection.getM

Spring-boot JMS 发送消息慢的问题解决

1:在<ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用>(http://www.cnblogs.com/yshyee/p/7277801.html)中,采用以下代码进行JMS消息发送: @Service public class Producer { @Autowired private JmsMessagingTemplate jmsTemplate; public void sendMessage(Destin