JMS消息队列ActiveMQ(点对点模式)

生产者(producer)->消息队列(message queue)

package com.java1234.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * @author Administrator
 *
 */
public class JMSProducer {

	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
	private static final int SENDNUM=10; // 发送的消息数量

	public static void main(String[] args) {

		ConnectionFactory connectionFactory; // 连接工厂
		Connection connection = null; // 连接
		Session session; // 会话 接受或者发送消息的线程
		Destination destination; // 消息的目的地
		MessageProducer messageProducer; // 消息生产者

		// 实例化连接工厂
		connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

		try {
			connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
			connection.start(); // 启动连接
			session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
			destination=session.createQueue("FirstQueue1"); // 创建消息队列
			messageProducer=session.createProducer(destination); // 创建消息生产者
			sendMessage(session, messageProducer); // 发送消息
			session.commit();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 * 发送消息
	 * @param session
	 * @param messageProducer
	 * @throws Exception
	 */
	public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
		for(int i=0;i<JMSProducer.SENDNUM;i++){
			TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
			System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
			messageProducer.send(message);
		}
	}
}

消费者(Consumer)--(监听)->消息队列

package com.java1234.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author Administrator
 *
 */
public class JMSConsumer2 {

	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址

	public static void main(String[] args) {
		ConnectionFactory connectionFactory; // 连接工厂
		Connection connection = null; // 连接
		Session session; // 会话 接受或者发送消息的线程
		Destination destination; // 消息的目的地
		MessageConsumer messageConsumer; // 消息的消费者

		// 实例化连接工厂
		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);

		try {
			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
			connection.start(); // 启动连接
			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
			messageConsumer=session.createConsumer(destination); // 创建消息消费者
			messageConsumer.setMessageListener(new Listener()); // 注册消息监听
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}
}

监听(Listener)消息队列

package com.java1234.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		try {
			System.out.println("收到的消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
时间: 2024-12-23 16:46:36

JMS消息队列ActiveMQ(点对点模式)的相关文章

JMS消息队列ActiveMQ(发布/订阅模式)

消费者1(Consumer)--订阅(subcribe)-->主题(Topic) package com.java1234.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.

消息队列中点对点与发布订阅区别(转)

背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. 点对点与发布订阅最初是由JMS定义的.这两种模式主要区别或解决的问题就是发送到队列的消息能否重

消息队列中点对点与发布订阅区别

背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. 点对点与发布订阅最初是由JMS定义的.这两种模式主要区别或解决的问题就是发送到队列的消息能否重

Flume 读取JMS 消息队列消息,并将消息写入HDFS

利用Apache Flume 读取JMS 消息队列消息.并将消息写入HDFS,flume agent配置例如以下: flume-agent.conf #name the  components on this agent agentHdfs.sources  = jms_source agentHdfs.sinks =  hdfs_sink agentHdfs.channels  = mem_channel #  Describe/configure the source agentHdfs.s

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件: 由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. ActiveMQ是用来干什么的? 用来处理消息,也就是处理JMS的.消息队列在大型电子商务类网站,如京东.淘宝.去哪儿等网站有着深入的应用, 队列的主要作用是消除高并发访问高峰,加快网站的响应速度. 在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力, 同时也使

Java消息队列ActiveMQ (一)--JMS基本概念

摘要:The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely

消息队列-ActiveMQ

1 业务需求描述 举例描述: 再警情通报的业务时通过发送消息界面可以选择 警情联络,和船情通报两种消息 发送方式可分为 一对一发送:部门对部门.个人对个人 一对多发送:部门对多部门.个人对多人 2 功能实现设计 基于上述需求描述,在消息传输功能实现上选用activemq进行警情联络消息传输功能的实现.1. 基础概念 ActiveMQ:是Apache出品,最流行的,能力强劲的开源消息总线.是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. JMS(Java消息服务)

深入浅出 消息队列 ActiveMQ(转)

一. 概述与介绍 ActiveMQ 是Apache出品,最流行的.功能强大的即时通讯和集成模式的开源服务器.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现.提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能. 二. 特性 1. 多种语言和协议编写客户端.语言: Java.C.C++.C#.Ruby.Perl.Python.PHP.应用协议:OpenWire.Stomp REST.WS N

深入浅出 消息队列 ActiveMQ

一. 概述与介绍 ActiveMQ 是Apache出品,最流行的.功能强大的即时通讯和集成模式的开源服务器.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现.提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能. 二. 特性 1. 多种语言和协议编写客户端.语言: Java.C.C++.C#.Ruby.Perl.Python.PHP.应用协议:OpenWire.Stomp REST.WS N