activeMQ使用入门(点对点消息)

首先创建一个maven工程,在pom文件中增加相关的依赖包,如下:

<dependency>
  		<groupId>javax.jms</groupId>
  		<artifactId>jms-api</artifactId>
  		<version>1.1-rev-1</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.activemq</groupId>
  		<artifactId>activemq-core</artifactId>
  		<version>5.7.0</version>
  	</dependency>

创建测试类:

发送消息类:

SendMessage

package com.jason.testmq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendMessage {
	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "choice.queue";
	//private String expectedBody = "<hello>world!two</hello>";
	//private String expectedBody = "stop";

	public void sendMessage() throws JMSException {
		Connection connection = null;
		try {
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					url);
			connection = (Connection) connectionFactory.createConnection();
			connection.start();
			Session session = (Session) connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(QUEUE_NAME);
			MessageProducer producer = session.createProducer(destination);
//			TextMessage message = session.createTextMessage(expectedBody);
//			message.setStringProperty("headname", "remoteB");
			JmsTestMessage testMessage = new JmsTestMessage();
			testMessage.setId("1234567");
			testMessage.setMsg("stop");
			testMessage.setStatus(1);
			ObjectMessage message = session.createObjectMessage(testMessage);
			producer.send(message);
			producer.close();
			session.close();
			connection.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		SendMessage sndMsg = new SendMessage();
		try {
			sndMsg.sendMessage();
		} catch (Exception ex) {
			System.out.println(ex.toString());
		}
	}
}

接收消息类:

ReceiveMessage

/**
 * 
 */
package com.jason.testmq;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveMessage {
	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "choice.queue";

	public void receiveMessage() {
		Connection connection = null;
		try {

			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					url);
			connection = connectionFactory.createConnection();
			connection.start();
			Session session = connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(QUEUE_NAME);
			MessageConsumer consumer = session.createConsumer(destination);
			consumeMessagesAndClose(connection, session, consumer);
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}

	protected void consumeMessagesAndClose(Connection connection,
			Session session, MessageConsumer consumer) throws JMSException {
		for (int i = 0; i < 1;) {
			Message message = consumer.receive(1000);
			if (message != null) {
				i++;
				onMessage(message);
			}
		}
		System.out.println("Closing connection");
		consumer.close();
		session.close();
		connection.close();
	}

	public void onMessage(Message message) {
//		try {
//			if (message instanceof TextMessage) {
//				TextMessage txtMsg = (TextMessage) message;
//				String msg = txtMsg.getText();
//				System.out.println("Received: " + msg);
//			}
//		} catch (Exception e) {
//			e.printStackTrace();
//		}
		try {
			if (message instanceof ObjectMessage) {
				ObjectMessage objMsg = (ObjectMessage)message;
				Serializable obj = objMsg.getObject();
				if (obj instanceof JmsTestMessage) {
					JmsTestMessage testMessage = (JmsTestMessage)obj;
					System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus());
				} else {
					System.out.println("it is not JmsTestMessage");
				}

			} else {
				System.out.println("other type message with type is " + message.getJMSType());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String args[]) {
		ReceiveMessage rm = new ReceiveMessage();
		rm.receiveMessage();
	}

}

以注册监听的方式接收消息

ReceiveMessageWithListener

/**
 * 
 */
package com.jason.testmq;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveMessageWithListener implements MessageListener {
	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "choice.queue";

	private boolean stop = false;

	public void receiveMessage() {
		(new Thread(new ReceiveMessageRunnable())).start();
	}

	public void onMessage(Message message) {
//		try {
//			if (message instanceof TextMessage) {
//				TextMessage txtMsg = (TextMessage) message;
//				String msg = txtMsg.getText();
//				System.out.println("Received: " + msg);
//				if ("stop".equals(msg)) {
//					this.stop = true;
//				}
//			}
//		} catch (Exception e) {
//			e.printStackTrace();
//		}
		try {
			if (message instanceof ObjectMessage) {
				ObjectMessage objMsg = (ObjectMessage)message;
				Serializable obj = objMsg.getObject();
				if (obj instanceof JmsTestMessage) {
					JmsTestMessage testMessage = (JmsTestMessage)obj;
					System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus());
					if ("stop".equals(testMessage.getMsg())) {
						this.stop = true;
					}
				} else {
					System.out.println("it is not JmsTestMessage");
				}

			} else {
				System.out.println("other type message with type is " + message.getJMSType());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public static void main(String args[]) {
		ReceiveMessageWithListener rm = new ReceiveMessageWithListener();
		rm.receiveMessage();
	}

	private class ReceiveMessageRunnable implements Runnable {

		public void run() {
			Connection connection = null;
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					url);
			try {
				connection = connectionFactory.createConnection();
				Session session = connection.createSession(false,
						Session.AUTO_ACKNOWLEDGE);
				Destination destination = session.createQueue(QUEUE_NAME);
				MessageConsumer consumer = session.createConsumer(destination);
				consumer.setMessageListener(ReceiveMessageWithListener.this);
				connection.start();
				while (!ReceiveMessageWithListener.this.stop) {
					try {
						Thread.sleep(10000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				System.out.println("Closing connection");
				consumer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}

		}

	}

}

在队列中传递的消息类

JmsTestMessage

/**
 * 
 */
package com.jason.testmq;

import java.io.Serializable;

/**
 * @author jasonzhang
 *
 */
public class JmsTestMessage implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private String id;
	private String msg;
	private int status;
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	public int getStatus() {
		return status;
	}
	public void setStatus(int status) {
		this.status = status;
	}
}

参考链接:

Java消息服务(JMS)学习小结

学习jms(一)——基本实例

实战activeMQ

时间: 2024-10-06 15:26:28

activeMQ使用入门(点对点消息)的相关文章

JMS消息服务器(二)——点对点消息传送模型

一.点对点模型概览 当你只需要将消息发布送给唯一的一个消息消费者是,就应该使用点对点模型.虽然可能或有多个消费者在队列中侦听统一消息,但是,只有一个且仅有一个消费者线程会接受到该消息. 在p2p模型中,生产者称为发送者,而消费者则称为接受者.点对点模型最重要的特性如下: 消息通过称为队列的一个虚拟通道来进行交换.队列是生产者发送消息的目的地和接受者消费消息的消息源. 每条消息通仅会传送给一个接受者.可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费. 消息存在先

ActiveMQ从入门到精通(二)

接上一篇<ActiveMQ从入门到精通(一)>,本篇主要讨论的话题是:消息的顺序消费.JMS Selectors.消息的同步/异步接受方式.Message.P2P/PubSub.持久化订阅.持久化消息到MySQL以及与Spring整合等知识. 消息的顺序消费 在上一篇文章中,我们已经明确知道了ActiveMQ并不能保证消费的顺序性,即便我们使用了消息优先级.而在实际开发中,有些场景又是需要对消息进行顺序消费的,比如:用户从下单.到支付.再到发货等.如果使用ActiveMQ该如何保证消费的顺序性

点对点消息传送模型

点对点(p2p)消息传送模型 在p2p模型中,生产者成为发送者,而消费者成为接收者.点对点最重要的特性如下: 消息通过成为队列的一个虚拟通道来进行交换. 每条消息仅会传送给一个接收者.可能会有多个接收者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接收者所消费. 消息存在先后顺序.一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者.当消息已被消费时,就会从队列中将它们删除.(除非使用了消息优先级外) 生产者和消费者之间没有耦合.接收者和发送者可以在运行时动态添加,这使得

ActiveMQ结合Spring收发消息

直接使用 ActiveMQ 的方式需要重复写很多代码,且不利于管理,Spring 提供了一种更加简便的方式----Spring JMS ,通过它可以更加方便地使用 ActiveMQ. Maven 依赖结合Spring使用ActiveMQ的依赖如下: ActiveMQ.xml 文件 <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.o

学习ActiveMQ(二):点对点(队列)模式消息演示

一:介绍 点对点的消息发送方式主要建立在 消息(Message ),队列(Queue),发送者(Sender),消费者(receiver)上,Queue 存贮消息,Sender 发送消息,receive接收消息.具体点就是Sender Client通过Queue发送message ,而 receiver Client从Queue中接收消息.消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行. 二:通过jms编码接口之间的关

基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送

写了一个简单的JMS例子,之所以使用JNDI 是出于通用性考虑,该例子使用JMS规范提供的通用接口,没有使用具体JMS提供者的接口,这样可以保证我们编写的程序适用于任何一种JMS实现(ActiveMQ.HornetQ...). 什么是JNDI JNDI(Java Naming and Directory Interface)是一个标准规范,类似于JDBC,JMS等规范,为开发人员提供了查找和访问各种命名和目录服务的通用.统一的接口.J2EE 规范要求所有 J2EE 容器都要提供 JNDI 规范的

ActiveMQ从入门到精通(一)

这是关于消息中间件ActiveMQ的一个系列专题文章,将涵盖JMS.ActiveMQ的初步入门及API详细使用.两种经典的消息模式(PTP and Pub/Sub).与Spring整合.ActiveMQ集群.监控与配置优化等.话不多说,我们来一起瞧一瞧! JMS 首先来说较早以前,也就是没有JMS的那个时候,很多应用系统存在一些缺陷: 1.通信的同步性 client端发起调用后,必须等待server处理完成并返回结果后才能继续执行 2.client 和 server 的生命周期耦合太高 clie

ActiveMQ的(点对点&amp;发布/订阅通信模式)和(持久化方式)

ActiveMQ的持久化 消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消

ActiveMQ笔记之点对点队列(Point-to-Point)

1. 点对点通信 点对点是一种一对一通信方式,更像是有一个队列,一个人往队列里放消息,另一个人从队列中取消息,其最大的特点是一个消息只会被消费一次,即使有多个消费者同时消费,他们消费的也是不同的消息. 2. 简单实现 添加依赖 添加Maven依赖: <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.ac