ActiveMQ 运行案例

前言

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

一、使用介绍

环境准备

1、activemq下载传送门:https://activemq.apache.org/download.html、MyEclipse

2、启动activemq,我电脑是win64位,所以启动bin木下win64中的activemq.bat

3、需要输入用户名和密码才能进入,页面成功启动的效果。(默认用户名和密码皆为:admin)

开始测试

1、新建一个JMS java工程,导入下载的activemq文件里的jar包,例如作者目录下的activemq-all-5.11.1.jar

导入工程

2、新建消息生产者 JMSProducer.java

package com.hcg.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 *
 * @author babylon
 * 2016-5-9
 */
public class JMSProducer {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	private static final int SENDNUM= 10;	// 发送的消息数量

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageProducer messageProducer;	// 消息生产者

		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);	// 创建会话
			destination = session.createQueue("FirstQueue1");	// 创建消息队列
			messageProducer = session.createProducer(destination);		// 创建消息生产者
			// 发送消息
			sendMessage(session, messageProducer);
		    // 正式提交发送消息的操作
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}  finally {
			// 关闭连接
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}

	}

	/**
	 * 发送消息
	 * @param session
	 * @param messageProducer
	 * @throws JMSException
	 */
	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
		for(int i=0; i < JMSProducer.SENDNUM; i++){
			TextMessage message = session.createTextMessage("ActiveMQ  发送的消息"+i);
			System.out.println("发送消息:"+i);
			messageProducer.send(message);
		}
	}

}

3、F11运行实例,可以看见成功发送了10条消息

3、创建消费者 JMSConsumer.java

package com.hcg.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者

		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			destination = session.createQueue("FirstQueue1");	// 创建消息队列
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			while(true){
				TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
				if(textMessage != null){
					System.out.println("收到的消息:"+textMessage.getText());
				} else {
					break;
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

4、F11运行消费者,可以看见在控制台新增了一个消费者,发送的消息都已被消费处理

5、while(true)这种方式处理消费是不合适的,下面以监听的方式处理创建消费者 JMSConsumer_listener.java。

package com.hcg.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者 - 监听方式消费
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer_listener {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者

		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer_listener.USERNAME, JMSConsumer_listener.PASSWORD, JMSConsumer_listener.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			destination = session.createQueue("FirstQueue1");	// 创建消息队列
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			messageConsumer.setMessageListener(new Listener());			// 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

监听对象需要实现MessageListener

package com.hcg.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 *
 * @author babylon
 * 2016-5-9
 */
public class Listener implements MessageListener{

	/*
	 * 收到的消息
	 */
	@Override
	public void onMessage(Message message) {
		try {
			System.out.println("收到的消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

6、运行该类,发现消费者数量增加了一个。

消息收发和订阅的方式

1、创建消息发布者

package com.hcg.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者 - 消息发布者
 *
 * @author babylon
 * 2016-5-9
 */
public class JMSProducer {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	private static final int SENDNUM = 10;	// 发送的消息数量

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageProducer messageProducer;	// 消息生产者

		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);	// 创建会话
			// destination = session.createQueue("FirstQueue1");	// 创建消息队列
			destination = session.createTopic("FirstTopic1");
			messageProducer = session.createProducer(destination);		// 创建消息生产者
			// 发送消息
			sendMessage(session, messageProducer);
		    // 正式提交发送消息的操作
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}  finally {
			// 关闭连接
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}

	}

	/**
	 * 发布的消息
	 * @param session
	 * @param messageProducer
	 * @throws JMSException
	 */
	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
		for(int i=0; i < JMSProducer.SENDNUM; i++){
			TextMessage message = session.createTextMessage("ActiveMQ  发送的消息"+i);
			System.out.println("发送消息:"+i);
			messageProducer.send(message);
		}
	}

}

2、创建消息订阅者1

package com.hcg.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者 - 消息订阅者1
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者

		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			// destination = session.createQueue("FirstQueue1");	// 创建消息队列
			destination = session.createTopic("FirstTopic1");
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			messageConsumer.setMessageListener(new Listener());			// 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

3、创建消息订阅者2

package com.hcg.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者 - 消息订阅者2
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer2 {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者

		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			// destination = session.createQueue("FirstQueue1");	// 创建消息队列
			destination = session.createTopic("FirstTopic1");
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			messageConsumer.setMessageListener(new Listener2());			// 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

最终目录结构:

demo下载地址:https://github.com/JasonBabylon/activemq

时间: 2024-10-18 00:09:57

ActiveMQ 运行案例的相关文章

ActiveMQ入门案例以及整合Spring的简单实用

先来个ActiveMQ介绍哈: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,是一个消息中间件. 应用场景:为了实现系统之间的通信,把系统之间的调用耦合度降低就可以使用MQ. 1) activeMQ 是Apache出品,最流行的,能力强劲的开源消息总线. 2) avtiveMQ主要特点:完全支持JMS1.1和J2EE 1.4规范:支持spring,很容易内嵌到spring中:支持ajax. 3) activeMQ的消息形式: a) 点对点形式,即生产

Robot Framework(2)——简单运行案例

1.打开RIDE 之前介绍的3种方式都可以 2.创建工程和测试套件 1>点击File-New Project 2>点击OK,如下图: 3>右键点击New Suite 4>点击OK,如下图: 3.创建案例 1>右键点击New Test Case 2>点击OK,如下图: 4.编写脚本 程序员入门的第一行代码:hello world RF中log命令,是用来打印日志的,类比Java中的System.out.println("hello world"); 如

centos/ubuntu jenkins.war 自启动脚本,Linux启动停止jar包的运行案例

#! /bin/sh # chkconfig: 2345 10 90 # description: jenkins .... # This script will be executed *after* all the other init scripts. # You can put your own initialization stuff in here if you don't # want to do the full Sys V style init stuff. #prefix=/

使用Pabot并行运行RF案例

一.问题引入 在做接口自动化时随着案例增多,特别是流程类案例增多,特别是asp.net的webform类型的项目,再加上数据库校验也比较耗时,导致RF执行案例时间越来越长,就遇到这样一个问题,705个接口测试案例(案例包含流程类案例,一个流程类案例可能包含3.4个单个案例,单个案例又都包含登录),通过Jenkins在单机上要跑13小时30分钟,而且目前还不是整个项目所有的接口测试案例,案例个数还在继续增长,一个系统假如有3.4000个接口那如果按照这个速度可能得跑一两天,这是很可怕的. 二.解决

ActiveMQ简述

概述 ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范.JMS是一组Java应用程序接口,它提供消息的创建.发送.读取等一系列服务.JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信. JMS支持两种消息发送和接收模型.一种称为P2P(

ActiveMQ消息系统研究与学习

概述 ActiveMQ是Apache所提供的一个开源的消息系统,完全采用 Java 来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范.JMS是一组Java应用程序接口,它提供消息的创建.发送.读取等一系列服务.JMS提供了一组公共应用程序接口和响应的语法,类似于Java 数据库 的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信. JMS支持两种消息发送和接收模型.一种称为

linux上安装activeMQ

1.新建一个文件夹activeMQ  mkdir /server 2.授权   chmod 777 /server 3.下载activeMQ安装包,拷贝到/activeMQ目录下apache-activemq-5.6.0-bin.tar.gz,下载地址http://activemq.apache.org/download.html 4.解压文件到运行目录/activeServertar -xzvf /server/apache-activemq-5.6.0-bin.tar.gz 5.修改acti

Activemq 安装与集群配置

1. 新建文件夹activemq/server mkdir  server 2.授权 chmod 777 server 3.下载activeMQ安装包,拷贝到/activemq/server目录下 apache-activemq-5.9.0-bin.tar.gz,下载地址: http://activemq.apache.org/download.html 4.解压文件到运行目录/activemq/server tar -xzvf  apache-activemq-5.9.0-bin.tar.gz

ActiveMQ初识及安装

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线,支持mqtt协议,作为服务器 安装 下载activemq http://activemq.apache.org/activemq-5100-release.html 解压到磁盘 安装jdk 配置javahome,选择系统变量 JAVA_HOME     值:C:\Program Files\Java\jdk1.8.0_25 Path  值:%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin; classp