ActiveMQ消息队列-单节点测试(点对点模式)

ActiveMQ发送和接收消息的过程和jdbc操作数据库很类似:首先创建Connection连接对象,再获取Session会话对象,之后通过Session会话对象创建Producer、Consumer、Message等对象,只不过ActiveMQ的Connection对象是通过ActiveMQConnectionFactory工厂产生的。以下是一些场景的测试代码。

先定义一些常量数据,这些数据在后面的例子中也有用到

// 用户名
private final static String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 密码
private final static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 节点url
private final static String BROKERURL = "tcp://localhost:61616";
// 队列名称
private final static String DESTINATION  = "com_mq_queue_test";

1、首先测试一下最简单的这种,就是消息发送方发送一则消息,然后接收方接收消息,同时消息被消费方接收之后,采用自动应答模式(Session.AUTO_ACKNOWLEDGE),反馈给消息发送发。

发送方代码:
@Test
public void run() {
    try {
	// 创建ActiveMQConnectionFactory工厂
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	// 通过ActiveMQConnectionFactory工厂,创建Connection对象
	Connection connection = factory.createConnection();
	// 启动Connection对象
	connection.start();

	// 创建Session会话
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageProducer sender = session.createProducer(destination);
	// 设置消息的持久化模式:如果设置成持久化,则传递消息时需要消息存储起来,然后再传递。即message->broker->message store->返回给消息发送者消息是否存储成功
	// 如果设置成非持久化,表示消息是异步传递,则消息的传递路径是message-broker->返回给消息发送者和存储数据这2个步骤异步执行,性能较持久化模式快
	// DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失;PERSISTENT保证重启后会把没有发送的数据再次发送直至发送成功
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

	// 创建消息并发送消息
	Message message = session.createMessage();
	message.setStringProperty("name", "tom");
	sender.send(message);

	// 关闭资源
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

// 接收方代码:
@Test
public void run() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();

	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageConsumer receiver = session.createConsumer(destination);

	Message message = receiver.receive();
	String name = message.getStringProperty("name");
	System.out.println("接收到的消息体是:" + name);
	// message.acknowledge();  Session.AUTO_ACKNOWLEDGE时,默认自动发送确认消息

	receiver.close();
        session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

创建Session的时候,connection.createSession(boolean isTranscration, int acknowledge),第一个参数表示是否使用事务、第二个参数表示采用的消息应答的模式。

为什么会有这2个参数呢?我个人的理解是这样的,首先说下事务,我们都知道事务简单来说就是保证一批操作要么全部成功,要么全部失败,不存在部分成功,部分失败。在ActiveMQ中,消息发送发也可以在一个session中,发送多个Message消息,但要保证多个消息的在一个事务中,所以就需要将isTranscration设置成true,设置好之后,需要调用 session.commit()提交事务。

而acknowledge参数表示消息应答的模式,消息发送发发出消息,接收方接收到消息后,需要反馈给发送方,然后将消息从队列中移除。acknowledge的值有AUTO_ACKNOWLEDGE、CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE三种。AUTO_ACKNOWLEDGE表示自动应答模式,消息消费方接收到消息之后,会自动应答给发送发;CLIENT_ACKNOWLEDGE表示由消息消费方发送应答通知,消费方接收到消息之后,需要调用Message的acknowledge()方法,否则队列不会移除该消息,这可能会造成垃圾数据。具体应答代码如下:

Message message = receiver.receive();
String name = message.getStringProperty("name");
System.out.println("接收到的消息体是:" + name);
// 消息确认:否则数据不会被删除
message.acknowledge();

2、测试同一个事务(只关注消息发送方即可)

@Test
public void testTranscation() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();
	//  采用事务
	Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageProducer sender = session.createProducer(destination);
	sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	// 发送多个消息
	for(int i = 0; i < 10; i++) {
	    Message message = session.createMessage();
	    message.setStringProperty("name_" + i, "yangyong_" + i);
	    sender.send(message);
	    if(i == 1) { // 模拟异常
		int count = 10 / 0;
	    }
	}
	// 需要调用commit方法
	session.commit();
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

代码中使用10 / 0模拟异常,测试结果表明10个消息均未发送到队列中。

3、使用数据库存储消息

如果想把消息持久化到数据库中,只需要修改activemq.xml文件,测试代码和以文件存储数据的方式一样。首先在activemq.xml文件的<broker />标签上面添加如下配置(以mysql为例,先手动建好数据库)

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

然后在<broker />里找到<persistenceAdapter />标签,注释掉该配置,换成如下配置

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" 
                            createTablesOnStartup="true" 
                            useDatabaseLock="false"/>
</persistenceAdapter>

4、消息过滤(selector)

ActiveMQ是一个消息总线,多个应用中的消息都可以发送到一个队列里,每个应用的消息消费方可能只关注本系统产生的消息,这时需要定义一个selector,通过属性值筛选出感兴趣的消息数据。selector可以支持多种表达式,如=,>,<等等,可参考《ActiveMQ in Action》这本书。

// 测试selector消息过滤:发送方
@Test
public void testSelector() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();

	Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageProducer sender = session.createProducer(destination);
	sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

	//发送2个消息 
	TextMessage message = session.createTextMessage("测试selector消息过滤1");
	message.setStringProperty("from", "jack");
	sender.send(message);

	message = session.createTextMessage("测试selector消息过滤2");
	message.setStringProperty("from", "tom");
	sender.send(message);

	session.commit();
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

// 测试selector消息过滤:消费方
@Test
public void testSelector() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();

	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);

	// 设置筛选条件
	String selector = "from=‘jack‘";
	// 创建带有筛选条件的MessageConsumer对象
	MessageConsumer consumer = session.createConsumer(destination, selector);

	TextMessage message = (TextMessage) consumer.receive();
	System.out.println("接收到的消息:" + message.getText());

	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

测试结果表明消息消费方只能接收到发送发发出的第一个消息,第二个from=‘tom‘的消息不能被接收。

5、消费方接收消息后,发送消息给消息发出方

在消息被消费方接收后,可能消费方需要把处理结果反馈给发出方,然后发出方再执行一些业务逻辑,其流程为

// 消息发送发
public class QueueSender {
    public void testReplyTo() {
        try {
	   ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	    final Connection connection = factory.createConnection();
	    connection.start();

	    final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Destination destination = session.createQueue(DESTINATION);
	    MessageProducer sender = session.createProducer(destination);
	    sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

	    TextMessage message = session.createTextMessage("测试消息消费者返回处理结果给消息生产者");
	    sender.send(message);

	    // 接受消费方返回的消息
	    MessageConsumer consumer = session.createConsumer(destination);
	    consumer.setMessageListener(new MessageListener() {
	        public void onMessage(Message message) {
		    if(message != null && message instanceof TextMessage) {
			try {
			    System.out.println("消息消费方反馈的信息->messageID:" 
			                + message.getJMSCorrelationID() + ",消息实体:" 
			                + ((TextMessage)message).getText());
			    //关闭资源
			    session.close();
		            connection.close();
		        } catch (JMSException e) {
			    e.printStackTrace();
			}
		    }
		}
	    });
        } catch(Exception e) {
	    e.printStackTrace();
        }
   }
   public static void main(String[] args) {
	new QueueSender().testReplyTo();
    }
 }
 
 //  消息接收方:
 public class  QueueReceiver {
     public void testReplyTo() {
        try {
	   ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	    final Connection connection = factory.createConnection();
	    connection.start();

	    final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    final Destination destination = session.createQueue(DESTINATION);
	    MessageConsumer consumer = session.createConsumer(destination);
	    consumer.setMessageListener(new MessageListener() {
	        public void onMessage(Message message) {
		    if(message != null && message instanceof TextMessage) {
		        try {
			    System.out.println("接收到的消息->messageID:" 
			                + message.getJMSMessageID() + ",消息实体:"
			                + ((TextMessage)message).getText());

			    // 发送消息给消息生产者
			    MessageProducer sender = session.createProducer(destination);
			    sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			    TextMessage message0 = session.createTextMessage("消费者已经处理了您发送的消息了");
		// 必须设置这些参数JMSCorrelationID:接收到的消息的ID,JMSReplyTo:消息发送到这个地址
			    message0.setJMSCorrelationID(message.getJMSMessageID());
			    message0.setJMSReplyTo(destination);
			    sender.send(message0);

			    session.close();
		            connection.close();
			} catch (JMSException e) {
			    e.printStackTrace();
			}
		    }
		}
	    });
        } catch(Exception e) {
	    e.printStackTrace();
	}
    }

    public static void main(String[] args) {
	new QueueReceiver().testReplyTo();
    }
}
时间: 2024-08-09 02:20:32

ActiveMQ消息队列-单节点测试(点对点模式)的相关文章

JAVA的设计模式之观察者模式----结合ActiveMQ消息队列说明

1----------------------观察者模式------------------------------ 观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新. activeMQ消息队列 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮

JAVAEE——宜立方商城08:Zookeeper+SolrCloud集群搭建、搜索功能切换到集群版、Activemq消息队列搭建与使用

1. 学习计划 1.solr集群搭建 2.使用solrj管理solr集群 3.把搜索功能切换到集群版 4.添加商品同步索引库. a) Activemq b) 发送消息 c) 接收消息 2. 什么是SolrCloud SolrCloud(solr 云)是Solr提供的分布式搜索方案,当你需要大规模,容错,分布式索引和检索能力时使用 SolrCloud.当一个系统的索引数据量少的时候是不需要使用SolrCloud的,当索引量很大,搜索请求并发很高,这时需要使用SolrCloud来满足这些需求. So

ActiveMQ消息队列的搭建

今天来写下消息队列 一.首先介绍下什么是activeMQ? ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 主要特点: 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Sto

ActiveMQ消息队列的使用及应用

一:JMQ的两种消息模式 1.1:点对点的消息模式 1.2:订阅模式 二:点对点的实现代码 2.1:点对点的发送端 2.2:点对点的接收端 三:订阅/发布模式的实现代码 3.1:订阅模式的发送端 3.2:订阅模式的接收端 四:发送消息的数据类型 4.1:传递javabean对象 4.2:发送文件 五:ActiveMQ的应用 5.1:保证消息的成功处理 5.2:避免消息队列的并发 5.2.1:主动接收队列消息 5.2.2:使用多个接收端 5.3:消息有效期的管理 5.4:过期消息,处理失败的消息如

ActiveMQ消息队列-简介

一.ActiveMQ是什么 ActiveMQ是一个消息中间件(Message-oriented middleware,MOM),实现JMS1.1规范,支持J2EE1.4以上,支持多种语言客户端(java,C,C++,AJAX等等),支持多种协议(http,https,ip多重广播,ssl协议,stomp协议,tcp协议,udp协议等)以及良好的spring支持. 二.ActiveMQ与RPC比较 1.都能实现系统之间解耦: 2.RPC是同步通信,ActiveMQ默认是异步通信,当然它也可支持同步

SpringBoot中使用rabbitmq,activemq消息队列和rest服务的调用

1. activemq 首先引入依赖 pom.xml文件 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 创建一个配置队列类 JMSConfiguration.java package com.wangx.boot.util; impo

Spring整合activeMQ消息队列

1.配置JMS <!-- Spring提供的JMS工具类,它可以进行消息发送.接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="conne

2015年12月10日 spring初级知识讲解(三)Spring消息之activeMQ消息队列

基础 JMS消息 一.下载ActiveMQ并安装 地址:http://activemq.apache.org/ 最新版本:5.13.0 下载完后解压缩到本地硬盘中,解压目录中activemq-core-5.13.0.jar,这就是ActiveMQ提供给我们的API. 在bin目录中,找到用于启动ActiveMQ的脚本,运行脚本后ActiveMQ就准备好了,可以使用它进行消息代理. 访问http://127.0.0.1:8161/admin/能看到如下则表示安装成功了. 二.在Spring中搭建消

ActiveMQ消息队列介绍(转)

ActiveMQ是一个开源兼容Java Message Service (JMS) 1.1面向消息的中件间. 来自Apache Software Foundation. ActiveMQ提供松耦合的应用程序架构. 先来看两个应用通过RPC通讯的紧耦合: 通过面向消息的中件间, 架构演变为: 我们看到应用程序1发送message到中件间, 应用程序2从中件间接收message. ActiveMQ提供了灵活的应用程序架构. ActiveMQ消息存储也是FIFO: 什么时候使用ActiveMQ: 1.