activeMq发送消息流程

1,发送消息入口

  Message message = messageBean.getMessageCreator().createMessage(session);
  producer.send(message);

2,调用ActiveMQMessageProducerSupport的send方法。该类实现了MessageProducer接口

 public void send(Message message) throws JMSException {
        this.send(this.getDestination(),
                  message,
                  this.defaultDeliveryMode,
                  this.defaultPriority,
                  this.defaultTimeToLive);
    }

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
        checkClosed();
        if (destination == null) {
            if (info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            }
            throw new InvalidDestinationException("Don't understand null destinations");
        }

        ActiveMQDestination dest;
        if (destination.equals(info.getDestination())) {
            dest = (ActiveMQDestination)destination;
        } else if (info.getDestination() == null) {
            dest = ActiveMQDestination.transform(destination);
        } else {
            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
        }
        if (dest == null) {
            throw new JMSException("No destination specified");
        }

        if (transformer != null) {
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if (transformedMessage != null) {
                message = transformedMessage;
            }
        }

        if (producerWindow != null) {
            try {
                producerWindow.waitForSpace();
            } catch (InterruptedException e) {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
        }

        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);

        stats.onMessage();
    }

3调用ActiveMQSession的send接口进行消息发送

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

        checkClosed();
        if (destination.isTemporary() && connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        }
        synchronized (sendMutex) {
            // tell the Broker we are about to start a new transaction
            doStartTransaction();
            TransactionId txid = transactionContext.getTransactionId();
            long sequenceNumber = producer.getMessageSequence();

            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
            message.setJMSDeliveryMode(deliveryMode);
            long expiration = 0L;
            if (!producer.getDisableMessageTimestamp()) {
                long timeStamp = System.currentTimeMillis();
                message.setJMSTimestamp(timeStamp);
                if (timeToLive > 0) {
                    expiration = timeToLive + timeStamp;
                }
            }
            message.setJMSExpiration(expiration);
            message.setJMSPriority(priority);
            message.setJMSRedelivered(false);

            // transform to our own message format here
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);

            // Set the message id.
            if (msg == message) {
                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
            } else {
                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
                message.setJMSMessageID(msg.getMessageId().toString());
            }
            //clear the brokerPath in case we are re-sending this message
            msg.setBrokerPath(null);
            // destination format is provider specific so only set on transformed message
            msg.setJMSDestination(destination);

            msg.setTransactionId(txid);
            if (connection.isCopyMessageOnSend()) {
                msg = (ActiveMQMessage)msg.copy();
            }
            msg.setConnection(connection);
            msg.onSend();
            msg.setProducerId(msg.getMessageId().getProducerId());
            if (LOG.isTraceEnabled()) {
                LOG.trace(getSessionId() + " sending message: " + msg);
            }
            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                this.connection.asyncSendPacket(msg);
                if (producerWindow != null) {
                    // Since we defer lots of the marshaling till we hit the
                    // wire, this might not
                    // provide and accurate size. We may change over to doing
                    // more aggressive marshaling,
                    // to get more accurate sizes.. this is more important once
                    // users start using producer window
                    // flow control.
                    int size = msg.getSize();
                    producerWindow.increaseUsage(size);
                }
            } else {
                if (sendTimeout > 0 && onComplete==null) {
                    this.connection.syncSendPacket(msg,sendTimeout);
                }else {
                    this.connection.syncSendPacket(msg, onComplete);
                }
            }

        }
    }

4,ActiveMQConnection发送消息

    public Response syncSendPacket(Command command) throws JMSException {
        if (isClosed()) {
            throw new ConnectionClosedException();
        } else {

            try {
                Response response = (Response)this.transport.request(command);
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse)response;
                    if (er.getException() instanceof JMSException) {
                        throw (JMSException)er.getException();
                    } else {
                        if (isClosed()||closing.get()) {
                            LOG.debug("Received an exception but connection is closing");
                        }
                        JMSException jmsEx = null;
                        try {
                            jmsEx = JMSExceptionSupport.create(er.getException());
                        } catch(Throwable e) {
                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
                        }
                        //dispose of transport for security exceptions
                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
                            Transport t = this.transport;
                            if (null != t){
                                ServiceSupport.dispose(t);
                            }
                        }
                        if (jmsEx !=null) {
                            throw jmsEx;
                        }
                    }
                }
                return response;
            } catch (IOException e) {
                throw JMSExceptionSupport.create(e);
            }
        }
    }

    public Response syncSendPacket(Command command) throws JMSException {
        if (isClosed()) {
            throw new ConnectionClosedException();
        } else {

            try {
                Response response = (Response)this.transport.request(command);
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse)response;
                    if (er.getException() instanceof JMSException) {
                        throw (JMSException)er.getException();
                    } else {
                        if (isClosed()||closing.get()) {
                            LOG.debug("Received an exception but connection is closing");
                        }
                        JMSException jmsEx = null;
                        try {
                            jmsEx = JMSExceptionSupport.create(er.getException());
                        } catch(Throwable e) {
                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
                        }
                        //dispose of transport for security exceptions
                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
                            Transport t = this.transport;
                            if (null != t){
                                ServiceSupport.dispose(t);
                            }
                        }
                        if (jmsEx !=null) {
                            throw jmsEx;
                        }
                    }
                }
                return response;
            } catch (IOException e) {
                throw JMSExceptionSupport.create(e);
            }
        }
    }

5,ResponseCorrelator发送request

    public Object request(Object command) throws IOException {
        FutureResponse response = asyncRequest(command, null);
        return response.getResult();
    }
        public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
        Command command = (Command) o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(true);
        FutureResponse future = new FutureResponse(responseCallback);
        IOException priorError = null;
        synchronized (requestMap) {
            priorError = this.error;
            if (priorError == null) {
                requestMap.put(new Integer(command.getCommandId()), future);
            }
        }

        if (priorError != null) {
            future.set(new ExceptionResponse(priorError));
            throw priorError;
        }

        next.oneway(command);
        return future;
    }

6,调用MutexTransport来发送消息

    @Override
    public void oneway(Object command) throws IOException {
        writeLock.lock();
        try {
            next.oneway(command);
        } finally {
            writeLock.unlock();
        }
    }
    

7,调用AbstractInactivityMonitor完成消息发送准备

    public void oneway(Object o) throws IOException {
        // To prevent the inactivity monitor from sending a message while we
        // are performing a send we take a read lock.  The inactivity monitor
        // sends its Heart-beat commands under a write lock.  This means that
        // the MutexTransport is still responsible for synchronizing sends
        this.sendLock.readLock().lock();
        inSend.set(true);
        try {
            doOnewaySend(o);
        } finally {
            commandSent.set(true);
            inSend.set(false);
            this.sendLock.readLock().unlock();
        }
    }

        private void doOnewaySend(Object command) throws IOException {
        if( failed.get() ) {
            throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
        }
        if (command.getClass() == WireFormatInfo.class) {
            synchronized (this) {
                processOutboundWireFormatInfo((WireFormatInfo) command);
            }
        }
        next.oneway(command);
    }

8,调用TcpTransport,OpenWireFormat,DataOutputStream最终完成通过tcp发送消息

   public void oneway(Object command) throws IOException {
        checkStarted();
        wireFormat.marshal(command, dataOut);
        dataOut.flush();
    }
     public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {

        if (cacheEnabled) {
            runMarshallCacheEvictionSweep();
        }

        int size = 1;
        if (o != null) {

            DataStructure c = (DataStructure)o;
            byte type = c.getDataStructureType();
            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
            if (dsm == null) {
                throw new IOException("Unknown data type: " + type);
            }
            if (tightEncodingEnabled) {
                BooleanStream bs = new BooleanStream();
                size += dsm.tightMarshal1(this, c, bs);
                size += bs.marshalledSize();

                if (!sizePrefixDisabled) {
                    dataOut.writeInt(size);
                }

                dataOut.writeByte(type);
                bs.marshal(dataOut);
                dsm.tightMarshal2(this, c, dataOut, bs);

            } else {
                DataOutput looseOut = dataOut;

                if (!sizePrefixDisabled) {
                    bytesOut.restart();
                    looseOut = bytesOut;
                }

                looseOut.writeByte(type);
                dsm.looseMarshal(this, c, looseOut);

                if (!sizePrefixDisabled) {
                    ByteSequence sequence = bytesOut.toByteSequence();
                    dataOut.writeInt(sequence.getLength());
                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
                }

            }

        } else {
            if (!sizePrefixDisabled) {
            	dataOut.writeInt(size);
            }
            dataOut.writeByte(NULL_TYPE);
        }
    }
        public final void writeInt(int v) throws IOException {
        out.write((v >>> 24) & 0xFF);
        out.write((v >>> 16) & 0xFF);
        out.write((v >>>  8) & 0xFF);
        out.write((v >>>  0) & 0xFF);
        incCount(4);
    }

附图

备注:MessageId是通过ProducerId+sequenceNumber来生成的,sequenceNumber通过原子变量的加1完成

时间: 2024-11-07 06:05:05

activeMq发送消息流程的相关文章

jmeter向ActiveMQ发送消息_广播/订阅(Topics 队列)

问题描述:测试中需要模拟大量设备的消息上报到平台,但是实际测试中没有那么多设备,所以采取用jmeter直接往ActiveMQ模拟发送设备消息 解决思路:获取平台采取的是Queues还是Topics :并且获取队列名,需要发送的数据,然后在jmeter中模拟发送 jmeter配置参考博客:https://www.cnblogs.com/51test/p/7280879.html 原文地址:https://www.cnblogs.com/weitung/p/9404212.html

新浪微博发送消息和授权机制原理(WeiboSDK)

1.首先是在微博发送消息,对于刚开始做weibo发送消息的初学者会有一个误区,那就是会认为需要授权后才可以发送消息,其实发送消息只需要几行代码就可以实现了,非常简单,不需要先授权再发送消息,因为weibosdk已经帮我们封装好了.(此情况需要用户安装客户端) 发送消息流程为:点击发送消息按键----SDK会自动帮我们判断用户是否安装了新浪微博客户端--如果未安装弹出安装提示----如果安装直接跳转到sina微博客户端进行发送----发送成功后自动跳回原应用程序. 1)在AppDelegate中注

ActiveMQ 发送和就收消息

一.添加 jar 包 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version> </dependency> 二.消息传递的两种形式 1.点对点:发送的消息只能被一个消费者接收,第一个消费者接收后,消息没了 2.发布/订阅:消息可以被多个消费

解决Springboot整合ActiveMQ发送和接收topic消息的问题

环境搭建 1.创建maven项目(jar) 2.pom.xml添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <dependencies> &l

ActiviteMQ 接收和发送消息基本流程

发送消息的基本步骤:(1) .创建连接使用的工厂类 JMS ConnectionFactory(2) .使用管理对象 JMS ConnectionFactory 建立连接 Connection ,并启动(3) .使用连接 Connection 建立会话 Session(4) .使用会话 Session 和管理对象 Destination 创建消息生产者 MessageSender(5) .使用消息生产者 MessageSender 发送消息消息接收者从 JMS 接受消息的步骤(1) .创建连接使

ActiveMQ之消息指针

消息指针(Message cursor)是activeMQ里一个非常重要的核心类,它是提供某种优化消息存储的方法.消息中间件的实现一般都是当消费者准备好消费消息的时候,它会从持久化存储中一批一批的读取消息,并发送给消费者.消息指针维护着下一批待读取消息的相关位置信息.  消息游标: 当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中.接下来,如果发现当前有活跃的consumer,而且这个consumer消费消息的速度能跟上producer生产消息的速度

一步一步来做WebQQ机器人-(五)(发送消息||完结)

× 本篇主要是: 发送QQ消息(to:好友,群),以及对小黄鸡抓包利用它的语言库 本文是WebQQ流程的最后一章 最后一章内容不多但我还是啰嗦,可能对大部分人都已知晓的流程方法我也会介绍一下 前面几个demo我已经上传到对应页面的尾部,剩下的会抽时间补,外包经常加班且没外网,尽量本周弄完 目前总进度大概100% 全系列预计会有这些步骤,当然某些步骤可能会合并: 验证码 第一次登陆 第二次登陆 保持在线和接收消息 获取好友和群列表 发送消息 变成智能的(*?∀?*) 回顾基础 一般抓包模拟请求的方

ActiveMQ持久化消息

ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式: 一.持久化为文件 ActiveMQ默认就支持这种方式,只要在发消息时设置消息为持久化就可以了. 打开安装目录下的配置文件: D:\ActiveMQ\apache-activemq\conf\activemq.xml在越80行会发现默认的配置项: <persistenceAdapter> <kahaDB directory="${activemq.da

ActiveMQ发消息和收消息

来自:http://blog.163.com/chengwei_1104/blog/static/53645274201382315625329/ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 下面详细的解释常用类的作用 ConnectionFactory 接口(连接工厂) 用