ActiveMQ消息发送与接收

推荐文章:ActiveMQ讯息传送机制以及ACK机制

ActiveMQ发送消息

  1:创建链接工厂ConnectionFactory

  2:创建链接Connection

  3:启动session

  4:创建消息发送目的地

  5:创建生产者

  6:发送消息

消息发送类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final String USERNAME = "admin";

    private static final String PASSWORD = "admin";

    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";

    private AtomicInteger count = new AtomicInteger();

    private ConnectionFactory connectionFactory;

    private Connection connection;

    private Session session;

    private Queue queue;

    private MessageProducer producer;

    public void init() {
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //从工厂中创建一个链接
            connection = connectionFactory.createConnection();
            //启动链接,不启动不影响消息的发送,但影响消息的接收
            connection.start();
            //创建一个事物session
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //获取消息发送的目的地,指消息发往那个地方
            queue = session.createQueue("test");
            //获取消息发送的生产者
            producer = session.createProducer(queue);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void sendMsg(String queueName) {
        try {
            int num = count.getAndIncrement();
            TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:生产者发送消息!,count:"+num);
            producer.send(msg);

            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}
connection.createSession方法
 /**
     * Creates a <CODE>Session</CODE> object.
     *
     * @param transacted indicates whether the session is transacted
     * @param acknowledgeMode indicates whether the consumer or the client will
     *                acknowledge any messages it receives; ignored if the
     *                session is transacted. Legal values are
     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
     * @return a newly created session
     * @throws JMSException if the <CODE>Connection</CODE> object fails to
     *                 create a session due to some internal error or lack of
     *                 support for the specific transaction and acknowledgement
     *                 mode.
     * @see Session#AUTO_ACKNOWLEDGE
     * @see Session#CLIENT_ACKNOWLEDGE
     * @see Session#DUPS_OK_ACKNOWLEDGE
     * @since 1.1
     */
    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        checkClosedOrFailed();
        ensureConnectionInfoSent();
        if(!transacted) {
            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
            }
        }
        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
    }
createSession方法里有两个参数,第一个参数表示是否使用事务,第二个参数表示消息的确认模式。消息的确认模式共有4种:1:AUTO_ACKNOWLEDGE 自动确认2:CLIENT_ACKNOWLEDGE 客户端手动确认 3:DUPS_OK_ACKNOWLEDGE 自动批量确认0:SESSION_TRANSACTED 事务提交并确认4:INDIVIDUAL_ACKNOWLEDGE 单条消息确认 为AcitveMQ自定义的ACK_MODE各种确认模式详细说明可以看文章:ActiveMQ讯息传送机制以及ACK机制从createSession方法中可以看出如果如果session不使用事务但是却使用了消息提交(SESSION_TRANSACTED)确认模式,或使用的消息确认模式不存在,将抛出异常。

ActiveMQ接收消息

  1:创建链接工厂ConnectionFactory

  2:创建链接Connection

  3:启动session

  4:创建消息发送目的地

  5:创建生产者

  6:接收消息或设置消息监听器

消息接收类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";

    private static final String PASSWORD = "admin";

    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";

    private AtomicInteger count = new AtomicInteger();

    private ConnectionFactory connectionFactory;

    private ActiveMQConnection connection;

    private Session session;

    private Queue queue;

    private MessageConsumer consumer;

    public void init() {
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //从工厂中创建一个链接
            connection = (ActiveMQConnection) connectionFactory.createConnection();

            //启动链接
            connection.start();
            //创建一个事物session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //获取消息接收的目的地,指从哪里接收消息
            queue = session.createQueue("test");
            //获取消息接收的消费者
            consumer = session.createConsumer(queue);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void receiver(String queueName) {

        try {
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
consumer.receive方法
/**
     * Receives the next message produced for this message consumer.
     * <P>
     * This call blocks indefinitely until a message is produced or until this
     * message consumer is closed.
     * <P>
     * If this <CODE>receive</CODE> is done within a transaction, the consumer
     * retains the message until the transaction commits.
     *
     * @return the next message produced for this message consumer, or null if
     *         this message consumer is concurrently closed
     */
    public Message receive() throws JMSException {
        checkClosed(); //检查unconsumedMessages是否关闭 ,消费者从unconsumedMessages对象中获取消息
        checkMessageListener(); //检查是否有其他消费者使用了监听器,同一消息消息队列中不能采用reveice和messageListener并存消费消息

        sendPullCommand(0); //如果prefetchSize为空且unconsumedMessages为空 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备
        MessageDispatch md = dequeue(-1); //从unconsumedMessages取出一个消息 
        if (md == null) {
            return null;
        }

        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false);

        return createActiveMQMessage(md);
    }
prefetchSize属性如果大于0,消费者每次拉去消息时都会预先拉取一定量的消息,拉取的消息数量<=prefetchSize,prefetchSize默认指为1000,这个默认值是从connection中传过来的
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
        checkClosed();

        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createConsumer(this, messageSelector, noLocal);
        }

        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
        int prefetch = 0;
        if (destination instanceof Topic) {
            prefetch = prefetchPolicy.getTopicPrefetch();
        } else {
            prefetch = prefetchPolicy.getQueuePrefetch();
        }
        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
    }

消息接收类代码中调用session.createConsumer其实调用的就是上面的createConsumer方法,从上面代码中可以看出connection会将自己的prefetch传递给消费者,connection中的ActiveMQPrefetchPolicy

对象属性如下:

public class ActiveMQPrefetchPolicy extends Object implements Serializable {
    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
    public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
    public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
    public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;

    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class);

    private int queuePrefetch;
    private int queueBrowserPrefetch;
    private int topicPrefetch;
    private int durableTopicPrefetch;
    private int optimizeDurableTopicPrefetch;
    private int inputStreamPrefetch;
    private int maximumPendingMessageLimit;

    /**
     * Initialize default prefetch policies
     */
    public ActiveMQPrefetchPolicy() {
        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
        this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
        this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
    }

这里我只截了一部分代码,可以看到队列(queue)默认的queuePrefetch为1000,queuePrefetch的最大值不能超过MAX_PREFETCH_SIZE(32767)

当然我们也可以自己设置消费者预先拉取的消息数量,方法有两种

一:在创建connection之后修改connection中的queuePrefetch;代码如下:

ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(number);
connection.setPrefetchPolicy(prefetchPolicy);

二:在创建队列(queue)的时候传入参数,回到ActiveMQMessageConsumer的创建代码中:

 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
            String name, String selector, int prefetch,
            int maximumPendingMessageCount, boolean noLocal, boolean browser,
            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
        if (dest == null) {
            throw new InvalidDestinationException("Don‘t understand null destinations");
        } else if (dest.getPhysicalName() == null) {
            throw new InvalidDestinationException("The destination object was not given a physical name.");
        } else if (dest.isTemporary()) {
            String physicalName = dest.getPhysicalName();

            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
            }

            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();

            if (physicalName.indexOf(connectionID) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }

            if (session.connection.isDeleted(dest)) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
            if (prefetch < 0) {
                throw new JMSException("Cannot have a prefetch size less than zero");
            }
        }
        if (session.connection.isMessagePrioritySupported()) {
            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
        }else {
            this.unconsumedMessages = new FifoMessageDispatchChannel();
        }

        this.session = session;
        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
        setTransformer(session.getTransformer());

        this.info = new ConsumerInfo(consumerId);
        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
        this.info.setSubscriptionName(name);
        this.info.setPrefetchSize(prefetch);
        this.info.setCurrentPrefetchSize(prefetch);
        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
        this.info.setNoLocal(noLocal);
        this.info.setDispatchAsync(dispatchAsync);
        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
        this.info.setSelector(null);

        // Allows the options on the destination to configure the consumerInfo
        if (dest.getOptions() != null) {
            Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);
            if (options.size() > 0) {
                String msg = "There are " + options.size()
                    + " consumer options that couldn‘t be set on the consumer."
                    + " Check the options are spelled correctly."
                    + " Unknown parameters=[" + options + "]."
                    + " This consumer cannot be started.";
                LOG.warn(msg);
                throw new ConfigurationException(msg);
            }
        }

        this.info.setDestination(dest);
        this.info.setBrowser(browser);
        if (selector != null && selector.trim().length() != 0) {
            // Validate the selector
            SelectorParser.parse(selector);
            this.info.setSelector(selector);
            this.selector = selector;
        } else if (info.getSelector() != null) {
            // Validate the selector
            SelectorParser.parse(this.info.getSelector());
            this.selector = this.info.getSelector();
        } else {
            this.selector = null;
        }

        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
                                   && !info.isBrowser();
        if (this.optimizeAcknowledge) {
            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
        }

        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
        this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
        if (messageListener != null) {
            setMessageListener(messageListener);
        }
        try {
            this.session.addConsumer(this);
            this.session.syncSendPacket(info);
        } catch (JMSException e) {
            this.session.removeConsumer(this);
            throw e;
        }

        if (session.connection.isStarted()) {
            start();
        }
    }

this.info.setPrefetchSize(prefetch);

又上面代码可以看出,在创建ActiveMQMessageConsumer的过程中,程序会将connection中的queuePrefetch赋给ActiveMQMessageConsumer对象中的info对象(info为一个ConsumerInfo对象)

Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);

在创建队列(queue)的过程中,我们可以传一些参数来配置消费者,这些参数的前缀必须为consumer. ,当我们传的参数与info对象中的属性匹配时,将覆盖info对象中的属性值,其传参形式如下:

queueName?param1=value1&param2=value2

所以我们如果想改变消费者预先拉取的消息数量,可以在创建对象的时候传入如下参数

queue = session.createQueue("test?consumer.prefetchSize=number");

ActiveMq接收消息--监听器

监听器代码如下:

package com.apt.study.util.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ReceiveListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage msg = (TextMessage) message;
             if(msg!=null) {
                 System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText());
             }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

消息接收类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";

    private static final String PASSWORD = "admin";

    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";

    private AtomicInteger count = new AtomicInteger();

    private ConnectionFactory connectionFactory;

    private ActiveMQConnection connection;

    private Session session;

    private Queue queue;

    private MessageConsumer consumer;

    public void init() {
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //从工厂中创建一个链接
            connection = (ActiveMQConnection) connectionFactory.createConnection();

            //启动链接
            connection.start();
            //创建一个事物session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            queue = session.createQueue("test");

            consumer = session.createConsumer(queue);
            //设置消息监听器
            consumer.setMessageListener(new ReceiveListener());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
consumer.setMessageListener方法:
public void setMessageListener(MessageListener listener) throws JMSException {
        checkClosed();
        if (info.getPrefetchSize() == 0) {
            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
        }
        if (listener != null) {
            boolean wasRunning = session.isRunning();
            if (wasRunning) {
                session.stop();
            }

            this.messageListener.set(listener);
            session.redispatch(this, unconsumedMessages);

            if (wasRunning) {
                session.start();
            }
        } else {
            this.messageListener.set(null);
        }
    }

从代码中可以看出,当我们使用监听器时,消费者prefetchSize必须大于0

 
 
时间: 2024-09-30 07:20:39

ActiveMQ消息发送与接收的相关文章

Chromium的IPC消息发送、接收和分发机制分析

由于Chromium采用多进程架构,因此会涉及到进程间通信问题.通过前面一文的学习,我们知道Browser进程在启动Render进程的过程中会建立一个以UNIX Socket为基础的IPC通道.有了IPC通道之后,接下来Browser进程与Render进程就以消息的形式进行通信.我们将这种消息称为IPC消息,以区别于线程消息循环中的消息.本文就分析Chromium的IPC消息发送.接收和分发机制. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! Chrom

DICOM医学图像处理:DIMSE消息发送与接收“大同小异”之DCMTK fo-dicom mDCM

背景: 从DICOM网络传输一文开始,相继介绍了C-ECHO.C-FIND.C-STORE.C-MOVE等DIMSE-C服务的简单实现,博文中的代码给出的实例都是基于fo-dicom库来实现的,原因只有一个:基于C#的fo-dicom库具有高封装性.对于初学者来说实现大多数的DIMSE-C.DIMSE-N服务几乎都是"傻瓜式"操作--构造C-XXX-RQ.N-XXX-RQ然后绑定相应的OnResponseReceived处理函数即可.本博文希望在前几篇预热的基础上,对比DCMTK.fo

TeamTalk Android代码分析(业务流程篇)---消息发送和接收的整体逻辑说明

第一次纪录东西,也没有特别的顺序,想到哪里就随手画了一下,后续会继续整理- 6.2消息页面动作流程 6.2.1 消息页面初始化的总体思路 1.页面数据的填充更新直接由页面主线程从本地数据库请求 2.数据库数据的填充由后台线程异步方式从网络请求 3.前台线程每次按照18条记录读取数据库数据,后台线程按照每次18*3从网络请求数据 4.后台线程数据的请求由主线程满足一定的条件后发送总线事件,在 oneventbackgroudthread 中处理,具体条件(或的关系)如下: 1>第一次请求 2>本

Bluemix结合RabbitMq实现消息发送与接收实例

什么是RabbitMq? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. 什么是Bluemix? BlueMix 是 IBM 基于 Cloud Foundr

cocos2dx 消息发送与接收

     cocos2dx有个自定义事件可以实现消息的发送和接收,叫EventListenerCustom.它是通过一个字符串来标识事件名称的.下面介绍下,我实现的这个消息的发送和接收.      首先,我们定义2个类,一个消息接收类,一个消息发送类.代码如下:       //消息接收 class cMsgReceiver { public: virtual void RecMsg(int iMsgId, void* pInfo, int iSize) { } }; //消息发送 class

windows消息发送与接收

Windows开发中一个重要的概念就是消息.能搞清楚消息的传递和处理,相信可以使我们对Windows程序有更深的理解. 先把消息划分为3类:发送消息(Incomingsent message).投递消息(Post message).输入消息(Input message).其中发送消息是非队列消息,而后两种是队列消息.在线程的消息队列中并不包括非队列消息,而只有队列消息才会在线程的消息队列中. 由上面的分类也可以知道为什么不能通过PostMessage函数来模拟输入动作.因为投递消息将进入到投递消

RocketMq入门之消息发送和接收

一.消息产生.发送 1 public class Producer { 2 public static void main(String[] args) throws MQClientException { 3 DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); 4 producer.setNamesrvAddr("172.18.4.114:9876"); 5 producer.setIn

linux 系统之间,网络编程,消息发送与接收

[email protected]:~/udp$ sudo apt-get update [email protected]:~/udp$ sudo apt-get install build-essential [email protected]:~/udp$ sudo apt-get install make [email protected]:~/udp$ ll -rw-rw-r-- 1 chunli chunli  279 May 15 10:36 makefile -rw-rw-r--

Spring整合activeMQ消息队列

1.配置JMS <!-- Spring提供的JMS工具类,它可以进行消息发送.接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="conne