Spring整合ActiveMQ

1.管理ActiveMQ

地址  http://localhost:8161/admin/

默认用户和密码:admin=admin

  1. 运行发送者,eclipse控制台输出,如下图:

    此时,我们先看一下ActiveMQ服务器,Queues内容如下:

    我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息,如下图:

    如果这些队列中的消息,被删除,消费者则无法消费。

  2. 我们继续运行一下消费者,eclipse控制台打印消息,如下: 

    此时,我们先看一下ActiveMQ服务器,Queues内容如下:

    我们可以看到HelloWorld的消息队列发生变化,多一个消息者,队列中的10条消息被消费了,点击Browse查看,已经为空了。

    点击Active Consumers,我们可以看到这个消费者的详细信息:

2.Spring整合ActiveMQ

1.maven

<!--ActiveMQ依赖 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.13.2</version>
		</dependency>
<!-- Spring整合ActiveMQ所需依赖 -->
			<dependency>
				<groupId>org.springframework</groupId>
				<artifactId>spring-jms</artifactId>
			</dependency>
			<dependency>
				<groupId>org.springframework</groupId>
				<artifactId>spring-messaging</artifactId>
			</dependency>

2.ActiveMQ配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://172.17.65.29:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- Spring JmsTemplate 的消息生产者 start-->

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>

    <!--Spring JmsTemplate 的消息生产者 end-->

    <!-- 消息消费者 start-->

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"/>
        <jms:listener destination="test.queue" ref="queueReceiver2"/>
    </jms:listener-container>

    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"/>
        <jms:listener destination="test.topic" ref="topicReceiver2"/>
    </jms:listener-container>

    <!-- 消息消费者 end -->
</beans>  

3.生产者

1.queue消息队列模式

package com.jay.mq.demo1.producer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * ActiveMQ的消息队列模式
 * Description: <br/>
 * 队列消息的生产者,发哦是哪个消息到队列
 * @author hetiewei
 * @date 2016年4月25日 上午10:20:20
 *
 */
@Component("queueSender")
public class QueueSender1 {
<span style="white-space:pre">	</span>
<span style="white-space:pre">	</span>/**
<span style="white-space:pre">	</span> * 通过Qualifier来注入对应的Bean
<span style="white-space:pre">	</span> */
<span style="white-space:pre">	</span>@Autowired
<span style="white-space:pre">	</span>@Qualifier("jmsQueueTemplate")
<span style="white-space:pre">	</span>private JmsTemplate jmsTemplate;
<span style="white-space:pre">	</span>
<span style="white-space:pre">	</span>/**
<span style="white-space:pre">	</span> * Description: <br/>
<span style="white-space:pre">	</span> * 发送消息到指定的队列(目标)
<span style="white-space:pre">	</span> * @author hetiewei
<span style="white-space:pre">	</span> * @date 2016年4月25日 上午10:24:57
<span style="white-space:pre">	</span> * @param queueName  队列名称
<span style="white-space:pre">	</span> * @param message    消息内容
<span style="white-space:pre">	</span> */
<span style="white-space:pre">	</span>public void send(String queueName, final String message){
<span style="white-space:pre">		</span>jmsTemplate.send(queueName, new MessageCreator() {
<span style="white-space:pre">			</span>
<span style="white-space:pre">			</span>public Message createMessage(Session session) throws JMSException {
<span style="white-space:pre">				</span>return session.createTextMessage(message);
<span style="white-space:pre">			</span>}
<span style="white-space:pre">		</span>});
<span style="white-space:pre">	</span>}
}

2. topic/sub订阅模式

package com.jay.mq.demo1.producer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * ActiveMQ的topic/sub模式
 * Description: <br/>
 * Topic生产者发送消息到Topic
 * @author hetiewei
 * @date 2016年4月25日 上午11:22:40
 *
 */
@Component
public class TopicSender1 {

	@Autowired
	@Qualifier("jmsTopicTemplate")
	private JmsTemplate jmsTemplate;

	/**
	 * 发送一条消息到指定的队列(目标)
	 * @param queueName 队列名称
	 * @param message 消息内容
	 */
	public void send(String topicName,final String message){
		jmsTemplate.send(topicName, new MessageCreator() {

			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}

}

4.消费者

1.queue消息队列模式

package com.jay.mq.demo1.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * Description: <br/>
 * 消息队列监听器
 * @author hetiewei
 * @date 2016年4月25日 上午10:32:14
 *
 */
@Component
public class QueueReceiver1 implements MessageListener {

	public void onMessage(Message message) {
		try {
			System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

package com.jay.mq.demo1.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * Description: <br/>
 * 消息队列监听器
 * @author hetiewei
 * @date 2016年4月25日 上午10:32:14
 *
 */
@Component
public class QueueReceiver2 implements MessageListener {

	public void onMessage(Message message) {
		try {
			System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

2.topic/sub订阅模式

package com.jay.mq.demo1.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class TopicReceiver1 implements MessageListener{

	public void onMessage(Message message) {
		try {
			System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

package com.jay.mq.demo1.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class TopicReceiver2 implements MessageListener{

	public void onMessage(Message message) {
		try {
			System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

5.Controller

package com.jay.controller.demo1;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import com.jay.mq.demo1.producer.queue.QueueSender1;
import com.jay.mq.demo1.producer.topic.TopicSender1;

/**
 * Description: <br/>
 *
 * @author hetiewei
 * @date 2016年4月25日 上午10:35:31
 *
 */
@Controller
@RequestMapping("activemq")
public class ActivemqController1 {

	@Resource
	private QueueSender1 queueSender;
	@Autowired
	private TopicSender1 topicSender;

	/**
	 * 发送消息到队列
	 * Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
	 * @param message
	 * @return String
	 */
	@ResponseBody
	@RequestMapping("queueSender")
	public String queueSender(@RequestParam("message")String message){
		String opt="";
		try {
			queueSender.send("test.queue", message);
			opt = "suc";
		} catch (Exception e) {
			opt = e.getCause().toString();
		}
		return opt;
	}

	@ResponseBody
	@RequestMapping("topicSender")
	public String topoicSender(@RequestParam("message")String message){
		String opt = "";
		try {
			topicSender.send("test.topic", message);
			opt = "suc";
		} catch (Exception e) {
			opt = e.getCause().toString();
		}
		return opt;
	}

}

3.ActiveMQ的多种部署方式

单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker
cluster等多种部署方式,但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求,所以后面就重点将解如何将两种部署方式相结合。

1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式

主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。

多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。

2)shared database Master-Slave方式

与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已。

3)Replicated LevelDB Store方式

这种主备方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master
broker node开启并接受客户端连接。

其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。

如果master死了,得到了最新更新的slave被允许成为master。fialed
node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2.
Master将会存储并更新然后等待 (2-1)=1个slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。

单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica
nodes,以防止一个node失败了,服务中断。

2、Broker-Cluster部署方式

前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。

Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。

1)static Broker-Cluster部署

在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker:

1、  首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>

</networkConnectors>

2、  修改Broker-A节点中的服务提供端口为61616:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3、  在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4、  修改Broker-A节点中的服务提供端口为61617:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5、分别启动Broker-A和Broker-B。

2)Dynamic Broker-Cluster部署

在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找:

1、  首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnectoruri="multicast://default"

dynamicOnly="true"

networkTTL="3"

prefetchSize="1"

decreaseNetworkConsumerPriority="true" />

</networkConnectors>

2、修改Broker-A节点中的服务提供端口为61616:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>

</transportConnectors>

3、在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnectoruri="multicast://default"

dynamicOnly="true"

networkTTL="3"

prefetchSize="1"

decreaseNetworkConsumerPriority="true" />

</networkConnectors>

4、修改Broker-B节点中的服务提供端口为61617:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>

</transportConnectors>

5、启动Broker-A和Broker-B

2、Master-Slave与Broker-Cluster相结合的部署方式

可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。

由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:

1、部署的配置修改

这里以Broker-A + Broker-B建立cluster,Broker-C作为Broker-B的slave为例:

1)首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>

</networkConnectors>

2)修改Broker-A节点中的服务提供端口为61616:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3)在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4)修改Broker-B节点中的服务提供端口为61617:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5)修改Broker-B节点中的持久化方式:

<persistenceAdapter>

<kahaDB directory="/localhost/kahadb"/>

</persistenceAdapter>

6)在Broker-C节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

7)修改Broker-C节点中的服务提供端口为61618:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

8)修改Broker-B节点中的持久化方式:

<persistenceAdapter>

<kahaDB directory="/localhost/kahadb"/>

</persistenceAdapter>

9)分别启动broker-A、broker-B、broker-C,因为是broker-B先启动,所以“/localhost/kahadb”目录被lock住,broker-C将一直处于挂起状态,当人为停掉broker-B之后,broker-C将获取目录“/localhost/kahadb”的控制权,重新与broker-A组成cluster提供服务。

4.ActiveMQ之Master-Slaver+负载均衡

什么叫中间件?

中间件为软件应用提供了操作系统所提供的服务之外的服务,可以把中间件描述为"软件胶水"。中间件不是操作系统的一部分,不是数据库操作系统,也不是应用软件的一部分,而是能够让软件开发者方便的处理通信、输入和输出,能够专注自己应用的部分。

消息中间件解决了应用之间的消息传递、解耦、异步的问题。

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

一般中间件都提供了横向扩展和纵向扩展,横向扩展就是我们经常说的负载均衡,纵向扩展提供了Master-Slaver;

负载均衡:提供负载均衡的中间件都对外提供服务

Master-Slaver:同时只有一个中间件对外提供服务,当Master出现挂机等问题,Slaver会自动接管

看一个整合的简图:

Master-Slave和Broker Cluster

准备:

jdk1.6,apache-activemq-5.10.0,mysql5.1,zookeeper-3.4.3

先来看看Master-Slave模式

Shared File System Master Slave

本次测试在同一台机器上:

首先更改配置conf/activemq,做如下修改:

<persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
	<kahaDB directory="D:/kahaDB"/>
 </persistenceAdapter>

将activemq拷贝3份,分别:apache-activemq-5.10.0-M1, apache-activemq-5.10.0-M2,apache-activemq-5.10.0-M3,分别启动activemq命令,启动的日志分别是:

表示当前进程是Master

表示当前进程没有获取到锁,作为Slaver

测试:

下面的例子中分别提供了Producer(Sender类)和Consumer(Receiver类)

我们首先用Producer发送消息给activemq,然后停止Master,然后再用Consumer接受消息,测试结果是可以接受到数据的。

2).JDBC Master Slave

JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一样的,只是把共享文件系统换成了共享数据库。

修改配置文件conf/activemq

<persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
	<!--<kahaDB directory="D:/kahaDB"/>-->
        <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
  </persistenceAdapter>

添加数据源:

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
	<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
	<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
	<property name="username" value="root"/>
	<property name="password" value="root"/>
	<property name="poolPreparedStatements" value="true"/>
    </bean>

注:这里使用的是mysql,所以需要mysql驱动程序: mysql-connector-java-5.1.18,讲jar包放入lib文件夹下面,驱动版本不对,会出现如下错误: Database
lock driver override not found for : [mysql_connect ...

分别拷贝到其他几个文件夹下,分别启动,启动成功后我们可以看到数据库中多了几张表:

测试方式同上;

官网手册:http://activemq.apache.org/jdbc-master-slave.html

3).Replicated
LevelDB Store

这种方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master

修改配置文件conf/activemq:

<!--<persistenceAdapter>
         <kahaDB directory="${activemq.data}/kahadb"/>
         <kahaDB directory="D:/kahaDB"/>
         <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
 </persistenceAdapter>-->
 <persistenceAdapter>
	<replicatedLevelDB
		directory="${activemq.data}/leveldb"
		replicas="3"
		bind="tcp://0.0.0.0:0"
		zkAddress="127.0.0.1:2181"
		hostname="127.0.0.1"
		sync="local_disk"
		zkPath="/activemq/leveldb-stores"/>
 </persistenceAdapter>

首先启动zookeeper,这里没有做集群处理,默认端口是2181,然后分别启动activemq,

启动之后报错:"activemq LevelDB IOException handler"。

原因:版本5.10.0存在的依赖冲突。

解决方案:

(1)移除lib目录中的pax-url-aether-1.5.2.jar包

(2)注释掉配置文件中的日志配置;

<!-- Allows accessing the server log
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>
-->

测试方式同上;

提供java版的例子:

public class Sender {
	private static final int SEND_NUMBER = 5;

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageProducer producer;
		connectionFactory = new ActiveMQConnectionFactory(
			ActiveMQConnection.DEFAULT_USER,
			ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			sendMessage(session, producer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

	public static void sendMessage(Session session, MessageProducer producer)
			throws Exception {
		for (int i = 1; i <= SEND_NUMBER; i++) {
			TextMessage message = session.createTextMessage("发送的消息"
					+ i);
			System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
			producer.send(message);
		}
	}
}
public class Receiver {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer consumer;
		connectionFactory = new ActiveMQConnectionFactory(
			ActiveMQConnection.DEFAULT_USER,
			ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				TextMessage message = (TextMessage) consumer.receive(100000);
				if (null != message) {
					System.out.println("收到消息" + message.getText());
				} else {
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}
}

5.Broker-Cluster实现负载均衡

Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue,提供了2中部署方式:

static Broker-Cluster和Dynamic Broker-Cluster

1).static Broker-Cluster

将ActiveMq拷贝2份,分别命名:apache-activemq-5.10.0_M1,apache-activemq-5.10.0_M2,

下面就是配置activemq,xml:

M1做如下配置:

?


1

2

3

<networkConnectors>

    <networkConnector
uri=
"static:(tcp://localhost:61617)"/>

</networkConnectors>

?


1

2

3

<transportConnectors>

        <transportConnector
name=
"openwire"

uri=
"tcp://0.0.0.0:61616?maximumConnectio       
ns=1000&amp;wireFormat.maxFrameSize=104857600"
/>

 </transportConnectors>

M2做如下配置:

?


1

2

3

<networkConnectors>

    <networkConnector
uri=
"static:(tcp://localhost:61616)"/>

</networkConnectors>

?


1

2

3

<transportConnectors>

        <transportConnector
name=
"openwire"

uri=
"tcp://0.0.0.0:61617?maximumConnectio       
ns=1000&amp;wireFormat.maxFrameSize=104857600"
/>

 </transportConnectors>

通过以上配置使M1和M2这两个 broker通过网络互相连接,并共享queue,

启动M1和M2,可以看到如下启动日志:

可以看到M1和M2,network connection has been established

测试我们还是用上一篇中的Sender和Receiver类,只需要做一点点修改:

Sender类还是链接tcp://localhost:61616,发送消息到queue,

Receiver做如下修改:

?


1

2

3

connectionFactory
=
new

ActiveMQConnectionFactory(

    ActiveMQConnection.DEFAULT_USER,

    ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61617");

经测试Receiver可以接受到数据,表示M1和M2已经共享了queue

2).Dynamic
Broker-Cluster

Dynamic
Discovery集群方式在配置ActiveMQ实例时,不需要知道所有其它实例的URI地址

对activemq.xml做如下配置:

M1做如下配置:

?


1

2

3

<networkConnectors>

    <networkConnector
uri=
"multicast://default"/>

</networkConnectors>

?


1

2

3

4

<transportConnectors>

        <transportConnector
name=
"openwire"

uri=
"tcp://0.0.0.0:61616?
maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"

          discoveryUri="multicast://default"/>

 </transportConnectors>

M2做如下配置:

?


1

2

3

<networkConnectors>

    <networkConnector
uri=
"multicast://default"/>

</networkConnectors>

?


1

2

3

4

<transportConnectors>

        <transportConnector
name=
"openwire"

uri=
"tcp://0.0.0.0:61617?
maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"

          discoveryUri="multicast://default"/>

 </transportConnectors>

启动M1和M2,可以看到如下启动日志: network
connection has been established

测试同static broker-cluster,可以得到相同的结果。

官网配置说明:http://activemq.apache.org/networks-of-brokers.html

Master-Slaver保证了数据的可靠性,Broker-Cluster提供了负载均衡,所以一般正式环境中都会采用:

Master-Slaver+Broker-Cluster的模式

时间: 2024-10-05 17:37:02

Spring整合ActiveMQ的相关文章

Spring整合ActiveMQ及多个Queue消息监听的配置

消息队列(MQ)越来越火,在java开发的项目也属于比较常见的技术,MQ的相关使用也成java开发人员必备的技能.笔者公司采用的MQ是ActiveMQ,且消息都是用的点对点的模式.本文记录了实现Spring整合ActivateMQ的全过程及如何使用MQ,便于后续查阅. 一.项目的搭建 采用maven构建项目,免去了copy jar包的麻烦.因此,我们创建了一个java类型的Maven Project (1)项目结构图 先把项目结构图看一下,便于对项目的理解. (2)pom.xml 我们需要加入以

Spring整合ActiveMQ测试

第一部分:创建项目(使用maven) --注意:使用IDEA创建maven普通项目还是聚合项目,都可以不用勾选,直接点next. 然后填入坐标和模块的名字   然后点击左上角的+号,选择web.   在新打开的页面下会显示web选项,这里的路径改为\src\main\webapp ,再修改web.xml文件的路径.   IDEA不会pom.xml文件默认生成jar文件,要在pom.xml添加<packaging>war</packaging>构建时生成war文件. 第二步:导入po

spring 整合 ActiveMQ

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2  

Java消息队列-Spring整合ActiveMq

1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 优势:异步.可靠 消息模型:点对点,发布/订阅 JMS中的对象  然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 多种语言和协议编写客户端.语言: Java

Spring整合ActiveMQ:spring+JMS+ActiveMQ+Tomcat

一.目录结构 相关jar包 二.关键配置activmq.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi=&quo

spring 整合activemq

一.xml配置 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/s

Spring整合activeMQ消息队列

1.配置JMS <!-- Spring提供的JMS工具类,它可以进行消息发送.接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="conne

深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例

基于spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务. 环境准备 工具 JDK1.6或1.7 Spring4.1.0 ActiveMQ5.11.1 Tomcat7.x 目录结构 所需jar包 项目的配置 配置ConnectionFactory connectionFactory是Spring用于创建到JMS服务器链接

Spring整合JMS——基于ActiveMQ实现

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2