ActiveMQ 消息服务(三)

想象场景:

有一条任务,需要在特定环境下进行。用ActiveMQ 来讲分两步,第一:发送者发布一条消息;第二:接收者接收到这条消息后需要干某些事情。

本文依然直接贴出demo代码!

1、项目结构图:

2、activeMQ的jar包依赖,部分pom.xml文件代码:

<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.7.0</version>
		</dependency>
		<!-- activemq-spring -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-spring</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.0.7.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.1</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.1</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.16</version>
		</dependency>
	</dependencies>

3、日志属性文件log4j.properties:

log4j.rootLogger=DEBUG,INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} [%c]-[%p] %m%n

4、消息接收配置属性文件receive.properties:

jsm_url=tcp://localhost:61616
jms_name=com.andy.demo.test
jsm_type=topic
fliter=
test_key=com.andy.demo.util.activeMQ.DoSomethingImpl
max_caches=100

5、消息发送配置属性文件send.properties:

jsm_url=tcp://localhost:61616
jms_name=com.andy.demo.test
jsm_type=topic
max_caches=100
persist=persist

6、场景中说到的,当我们收到消息后需要处理一些事情

本例中将需要处理的事情摘出来,分成需要处理事情的接口以及实现类两部分:

(一)接口IDoSomething.java:

package com.andy.demo.activeMQ.work;

import javax.jms.Message;

// 处理一些事情 的接口
public interface IDoSomething {
	// 干点实事1
	public void doSomeThing01(Message message);

	// 干点实事2
	public void doSomeThing02(Message message);
}

(二)接口实现类DoSomethingImpl.java:

package com.andy.demo.activeMQ.work.impl;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

import com.andy.demo.activeMQ.work.IDoSomething;

// 处理一些事情 的接口实现类
public class DoSomethingImpl implements IDoSomething {

	@Override
	public void doSomeThing01(Message message) {
		// TODO Auto-generated method stub
		if (message instanceof TextMessage) {
			TextMessage msg = (TextMessage) message;
			try {
				System.out.println("doSomeThing01 处理的消息内容:" + msg.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	@Override
	public void doSomeThing02(Message message) {
		// TODO Auto-generated method stub
		if (message instanceof TextMessage) {
			TextMessage msg = (TextMessage) message;
			try {
				System.out.println("doSomeThing02 处理的消息内容:" + msg.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

7、消息生产者或者叫消息发送者Sender.java:

package com.andy.demo.activeMQ;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @package :com.andy.demo.util.activeMQ<br>
 * @file :Sender.java<br>
 * @describe :消息发送者<br>
 * @author :wanglongjie<br>
 * @createDate :2015年11月6日下午1:21:33<br>
 * @updater :<br>
 * @updateDate :<br>
 * @updateContent :<br>
 */
public class Sender {
	private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);

	private ActiveMQConnectionFactory factory;
	private Connection conn;
	private Session session;
	private Destination destination;
	private MessageProducer producer;

	private String url;
	private String jmsname;
	private boolean isTopic;
	private boolean isPersist;
	private boolean isConnection;
	private BlockingQueue<String> queue;

	private String msg;

	public Sender(String url, String jmsname, boolean isTopic,
			boolean isPersist, int maxcaches) {
		super();
		System.out.println("Sender.Sender(): 通过构造函数实例化对象......");
		this.url = url;
		this.jmsname = jmsname;
		this.isTopic = isTopic;
		this.isPersist = isPersist;
		this.queue = new LinkedBlockingQueue<String>(maxcaches);
	}

	public static Sender getSenderCase(String url, String jmsname,
			boolean isTopic, boolean isPersist, int maxcaches) {
		System.out.println("Sender.getSenderCase(): 通过静态方法实例化对象......");
		return new Sender(url, jmsname, isTopic, isPersist, maxcaches);
	}

	public void addMessage(String msg) throws InterruptedException {
		System.out.println("Sender.addMessage(): 向队列添加消息......");
		queue.put(msg);
	}

	private void sendMsg(String msg) throws InterruptedException, JMSException {
		System.out.println("Sender.sendMsg(): 向服务器发送消息......");
		Thread.sleep(5 * 1000);
		producer.send(session.createTextMessage(msg));
	}

	public void send() {
		System.out.println("Sender.send(): 从队列中取出消息......");
		while (!queue.isEmpty()) {
			try {
				msg = queue.take();
				initActiveMQ();
				sendMsg(msg);
			} catch (Exception e) {
				// TODO: handle exception
				LOGGER.error(e.getMessage());
				close();
			}
		}
		close();
	}

	public void sendMessage(String msg) throws JMSException,
			InterruptedException {
		System.out.println("Sender.sendMessage(): 发送消息主方法开始运行......");
		initActiveMQ();
		sendMsg(msg);
		close();
	}

	// 初始化 activeMQ
	private void initActiveMQ() throws JMSException {
		System.out.println("Sender.initActiveMQ(): 初始化 activeMQ......");
		if (isConnection) {
			return;
		}
		factory = new ActiveMQConnectionFactory(url);
		conn = factory.createConnection();
		session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

		destination = isTopic ? session.createTopic(jmsname) : session
				.createQueue(jmsname);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(isPersist ? DeliveryMode.PERSISTENT
				: DeliveryMode.NON_PERSISTENT);

		isConnection = true;
	}

	// 关闭释放资源
	private void close() {
		System.out.println("Sender.close(): 关闭释放资源......");
		try {
			producer.close();
			session.close();
			conn.close();
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		}
		isConnection = false;
	}
}

8、消息订阅者或者叫消息接收者Receiver.java:

package com.andy.demo.activeMQ;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.andy.demo.activeMQ.work.IDoSomething;

/**
 * @package :com.andy.demo.util.activeMQ<br>
 * @file :Receive.java<br>
 * @describe :消息接收者<br>
 * @author :wanglongjie<br>
 * @createDate :2015年11月6日下午1:20:27<br>
 * @updater :<br>
 * @updateDate :<br>
 * @updateContent :<br>
 */
public class Receiver extends Thread implements MessageListener,
		ExceptionListener, Runnable {
	private static final Logger LOGGER = LoggerFactory
			.getLogger(Receiver.class);

	private ActiveMQConnectionFactory factory;
	private Connection conn;
	private Session session;
	private Destination destination;
	private MessageConsumer consumer;

	private String url;
	private String jmsname;
	private boolean isTopic;
	private String filter;
	private BlockingQueue<Message> queue;

	private IDoSomething doSomething;

	public Receiver(String url, String jmsname, boolean isTopic, String filter,
			IDoSomething doSomething, int maxcaches) {
		System.out.println("Receiver.Receiver(): 构造函数实例化对象......");
		this.url = url;
		this.jmsname = jmsname;
		this.isTopic = isTopic;
		this.filter = filter;
		this.doSomething = doSomething;
		queue = new LinkedBlockingQueue<Message>(maxcaches);
	}

	public static Receiver getReceiverCase(String url, String jmsname,
			boolean isTopic, String filter, IDoSomething doSomething,
			int maxcaches) throws JMSException {
		System.out.println("Receiver.getReceiverCase(): 静态方法实例化对象......");
		Receiver receiver = new Receiver(url, jmsname, isTopic, filter,
				doSomething, maxcaches);
		receiver.initActiveMQ();
		receiver.start();
		return receiver;
	}

	// 初始化 activeMQ 参数
	private void initActiveMQ() throws JMSException {
		System.out.println("Receiver.initActiveMQ():初始化activeMQ.......");
		factory = new ActiveMQConnectionFactory(url);
		conn = factory.createConnection();
		conn.start();
		session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

		destination = isTopic ? session.createTopic(jmsname) : session
				.createQueue(jmsname);
		consumer = (isNull(filter)) ? session.createConsumer(destination)
				: session.createConsumer(destination, filter);
		consumer.setMessageListener(this);
		conn.setExceptionListener(this);
	}

	public void close() {
		System.out.println("Receiver.close(): 关闭释放资源......");
		try {
			session.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			LOGGER.error(e.getMessage());
		}
		try {
			conn.stop();
			conn.close();
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		}
	}

	// 判断是否为空
	private boolean isNull(String param) {
		return param == null || param.equals("");
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
	 */
	@Override
	public void onException(JMSException exception) {
		// TODO Auto-generated method stub
		System.out.println("Receiver.onException():activeMQ 异常监听......");
		while (true) {
			try {
				initActiveMQ();
				break;
			} catch (Exception e) {
				// TODO: handle exception
				LOGGER.error(e.getMessage());
				try {
					Thread.sleep(10 * 1000);
				} catch (InterruptedException e1) {
					// TODO Auto-generated catch block
					LOGGER.error(e1.getMessage());
				}
			}
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		System.out.println("Receiver.onMessage(): activeMQ 消息接收监听......");
		try {
			if (isTopic) {
				queue.put(message);
			} else {
				doSomethingWork(message);
			}
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see java.lang.Thread#run()
	 */
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("Receiver.run(): Runnble接口监听......"
				+ Thread.currentThread().getName());
		while (true) {
			Message message = null;
			try {
				message = queue.take();
				doSomethingWork(message);
			} catch (Exception e) {
				// TODO: handle exception
				LOGGER.error(e.getMessage());
			}
		}
	}

	// 具体任务
	public void doSomethingWork(Message message) {
		System.out.println("Receiver.doSomethingWork(): 开始干实事了......");
		doSomething.doSomeThing01(message);
		doSomething.doSomeThing02(message);
	}

}

9、消息发送者和消息接收者的封装类ActiveMQUtils.java:

package com.andy.demo.activeMQ;

import java.io.InputStream;
import java.util.Properties;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.andy.demo.activeMQ.work.IDoSomething;

/**
 * @package :com.andy.demo.util.activeMQ<br>
 * @file :ActiveMQUtils.java<br>
 * @describe :<br>
 * @author :wanglongjie<br>
 * @createDate :2015年11月6日下午1:22:13<br>
 * @updater :<br>
 * @updateDate :<br>
 * @updateContent :<br>
 */
public class ActiveMQUtils {
	private static final Logger LOGGER = LoggerFactory
			.getLogger(ActiveMQUtils.class);

	public static final String jms_url = null;
	public static final String jms_name = null;
	public static final String jms_filter = null;
	public static final String jms_type = "topic";
	public static final String test_key = null;
	public static final String persist = "persist";

	/**
	 *
	 * @method :getReceiverBean<br>
	 * @describe :获取 消息接收者实例<br>
	 * @author :wanglongjie<br>
	 * @createDate :2015年11月6日下午4:03:31 <br>
	 * @param properties
	 * @param doSomething
	 * @return Receiver
	 * @throws JMSException
	 *
	 */
	public static Receiver getReceiverBean(String properties,
			IDoSomething doSomething) throws JMSException {
		Properties p = loadProperties(properties);
		String url = p.getProperty("jsm_url");
		String jmsname = p.getProperty("jms_name");
		boolean isTopic = p.getProperty("jsm_type", "topic").equals("topic");
		String filter = p.getProperty("fliter");
		int maxcaches = Integer.parseInt(p.getProperty("max_caches", "1000"));
		Receiver receiver = Receiver.getReceiverCase(url, jmsname, isTopic,
				filter, doSomething, maxcaches);
		return receiver;
	}

	/**
	 *
	 * @method :getSenderCase<br>
	 * @describe :获取 消息发送者实例<br>
	 * @author :wanglongjie<br>
	 * @createDate :2015年11月6日下午4:03:48 <br>
	 * @param properties
	 * @return Sender
	 */
	public static Sender getSenderCase(String properties) {
		Properties p = loadProperties(properties);
		String url = p.getProperty("jsm_url");
		String jmsname = p.getProperty("jms_name");
		boolean isTopic = p.getProperty("jsm_type", "topic").equals("topic");
		boolean isPersist = p.getProperty("persist", "persist").equals(
				"persist");
		int maxcaches = Integer.parseInt(p.getProperty("max_caches", "1000"));

		Sender sender = Sender.getSenderCase(url, jmsname, isTopic, isPersist,
				maxcaches);
		return sender;
	}

	/**
	 *
	 * @method :loadProperties<br>
	 * @describe :加载 属性文件<br>
	 * @author :wanglongjie<br>
	 * @createDate :2015年11月6日下午1:36:44 <br>
	 * @param properties
	 * @return Properties
	 */
	private static Properties loadProperties(String properties) {
		InputStream in = null;
		try {
			in = ActiveMQUtils.class.getResourceAsStream(properties);
			Properties p = new Properties();
			p.load(in);
			return p;
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		} finally {
			try {
				in.close();
			} catch (Exception e2) {
				// TODO: handle exception
				LOGGER.error(e2.toString());
			}
		}
		return null;
	}
}

10、测试类,分发送消息测试、接收消息测试两部分:

(一)消息发送测试类SenderAPPTest.java:

package com.andy.demo.activeMQ.test;

import java.util.Date;

import javax.jms.JMSException;

import com.andy.demo.activeMQ.ActiveMQUtils;
import com.andy.demo.activeMQ.Sender;

// 发送消息测试类
public class SenderAPPTest {
	public static void main(String[] args) {
		String properties = "/send.properties";
		Sender sender = ActiveMQUtils.getSenderCase(properties);
		int sum = 5;
		for (int i = 0; i < sum; i++) {
			String msg = new Date(System.currentTimeMillis()) + " ; Hello, I am andy And this is a activeMQ test[" + (i + 1 ) + "]!";
			try {
				sender.sendMessage(msg);
			} catch (JMSException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				Thread.sleep(10 * 1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

(二)接收消息测试类ReceiverAPPTest.java:

package com.andy.demo.activeMQ.test;

import javax.jms.JMSException;

import com.andy.demo.activeMQ.ActiveMQUtils;
import com.andy.demo.activeMQ.Receiver;
import com.andy.demo.activeMQ.work.impl.DoSomethingImpl;

// 接收消息测试类
public class ReceiverAPPTest {
	public static void main(String[] args) {
		String properties = "/receive.properties";
		DoSomethingImpl doSomething = new DoSomethingImpl();
		int num = 5;
		Receiver[] receivers = new Receiver[num];
		try {
			for (int i = 0; i < num; i++) {
				receivers[i] = ActiveMQUtils.getReceiverBean(properties,
						doSomething);
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

11、本例的源码已贴完,再说一下JMS中topic和queue的区别:

(一)topic:

在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

所以把配置文件中的jsm_type属性写成topic测试时,必须先启动ReceiverAPPTest.java接收者,然后在启动SenderAPPTest.java 发送者。也就是说在topic情况下,是先有接收者存在的情况下才能接收到发送者发送的消息。

(二)queue:

MS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡

所以把配置文件中的jms_type属性写成queue测试时,先启动发送者或者接收者都可以。

12、更加详细的ActiveMQ的通信方法可以参考上一篇博客。

时间: 2024-11-07 00:50:58

ActiveMQ 消息服务(三)的相关文章

ActiveMQ 消息服务(二)

本文介绍ActiveMQ的几种通信方法: 本文转自:http://shmilyaw-hotmail-com.iteye.com/blog/1897635  这篇文章总结的太好了,不得不转啊! 简介 在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择.这里,我们先针对具体的一个消息队列 Activemq的基本通信方式进行探讨.activemq是JMS消息通信规范的一个实现.总的来说,消息规范里面定义最常见的几种消息通信模式主要有 发布-订阅.点

Docker学习之搭建ActiveMQ消息服务

前言 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 在生产项目中,很多时候需要消息中间件来进行分布式系统间的通信.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能.本篇主要介绍ActiveMQ 相关概念以及安装说明,后面会着重介绍 SpringBoot 集成实现秒

消息中间件--ActiveMQ&amp;JMS消息服务

### 消息中间件 ### ---------- **消息中间件** 1. 消息中间件的概述 2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景) * 异步处理 * 应用解耦 * 流量削峰 * 消息通信 ---------- ### JMS消息服务 ### ---------- **JMS的概述** 1. JMS消息服务的概述 2. JMS消息模型 * P2P模式 * Pub/Sub模式 3. 消息消费的方式 * 同步的方式---手动 * 异步的方式---listener监听 4.

2015年12月10日 spring初级知识讲解(三)Spring消息之activeMQ消息队列

基础 JMS消息 一.下载ActiveMQ并安装 地址:http://activemq.apache.org/ 最新版本:5.13.0 下载完后解压缩到本地硬盘中,解压目录中activemq-core-5.13.0.jar,这就是ActiveMQ提供给我们的API. 在bin目录中,找到用于启动ActiveMQ的脚本,运行脚本后ActiveMQ就准备好了,可以使用它进行消息代理. 访问http://127.0.0.1:8161/admin/能看到如下则表示安装成功了. 二.在Spring中搭建消

三:JMS消息服务规范

一:JMS是什么?--->JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API--->用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.--->Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持.---> JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息.--->JMS(Java

JAVA的设计模式之观察者模式----结合ActiveMQ消息队列说明

1----------------------观察者模式------------------------------ 观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新. activeMQ消息队列 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮

ActiveMQ消息系统研究与学习

概述 ActiveMQ是Apache所提供的一个开源的消息系统,完全采用 Java 来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范.JMS是一组Java应用程序接口,它提供消息的创建.发送.读取等一系列服务.JMS提供了一组公共应用程序接口和响应的语法,类似于Java 数据库 的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信. JMS支持两种消息发送和接收模型.一种称为

JAVA消息服务JMS规范及原理详解

一.简介 JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持. JMS允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 二.常用术语介绍 在提到JMS时,我们通常会说到一些术语,解释如下: 消息

JMS学习四(ActiveMQ消息过滤)

消息的过期.消息的选择器和消息的优先级. 一.消息的过期 允许消息过期 .默认情况下,消息永不会过期.如果消息在特定周期内失去意义,那么可以设置过期时间. 有两种方法设置消息的过期时间,时间单位为毫秒: 1.使用消息生产者的setTimeToLive 方法为所有的消息设置过期时间.2.使用消息生产者的send 方法为每一条消息设置过期时间. 消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值.如果 timeToLive 值等于零,则 JMSExpiratio