消息中间件--ActiveMQ&JMS消息服务

### 消息中间件 ###

----------

**消息中间件**

1. 消息中间件的概述

2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景)

* 异步处理

* 应用解耦

* 流量削峰

* 消息通信

----------

### JMS消息服务 ###

----------

**JMS的概述**

1. JMS消息服务的概述

2. JMS消息模型

* P2P模式

* Pub/Sub模式

3. 消息消费的方式

* 同步的方式---手动

* 异步的方式---listener监听

4. JMS编程模型

----------

### 消息中间件:ActiveMQ ###

----------

**ActiveMQ的下载与安装**

1. ActiveMQ的下载与安装

* 在资料中找到ActiveMQ的压缩文件,解压apache-activemq-5.14.5-bin.zip文件

* 双击运行:activemq.bat文件,启动服务

2. 测试ActiveMQ是否安装成功

* 打开浏览器,输入:http://localhost:8161

3. 点击Manage ActiveMQ broker连接,可以查看ActiveMQ中已经发布的消息等

* 用户名密码都是:admin

----------

**ActiveMQ的消息队列方式入门**(P2P模式)

1. 在父工程的pom.xml文件中引入ActiveMQ和Spring整合JMS的坐标依赖(项目中已经引入)

<!-- activemq start -->

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-all</artifactId>

<version>5.2.0</version>

</dependency>

<!-- activemq end -->

<!-- spring 与 mq整合 start -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jms</artifactId>

<version>4.2.4.RELEASE</version>

</dependency>

<dependency>

<groupId>org.apache.xbean</groupId>

<artifactId>xbean-spring</artifactId>

<version>3.7</version>

</dependency>

<!-- spring 与 mq整合 end -->

2. ActiveMQ的向消息队列中发送消息的入门程序(没有使用Spring整合JMS的方式)

@Test

public void sendQueueMessage() throws JMSException {

// 1 创建连接工厂

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

// 2 使用工厂,创建连接

Connection connection = factory.createConnection();

// 3 启动连接

connection.start();

// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 5 创建队列队形(myQueue--队列的名字)/topic-----------session创建

Queue queue = session.createQueue("myQueue");

// 6 创建生产者-----------session创建

MessageProducer producer = session.createProducer(queue);

// 7 创建消息----文本消息-------session创建

TextMessage message = session.createTextMessage();

message.setText("helloworld!!!");

// 8 发送消息

producer.send(message);

// 9 提交事务

session.commit();

session.close();

connection.close();

}

3. ActiveMQ从消息队列中获取消息

@Test

public void receiverQueueMessage() throws JMSException {

// 1 创建连接工厂

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

// 2 使用工厂,创建连接

Connection connection = factory.createConnection();

// 3 启动连接

connection.start();

// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 5 创建队列队形(hello--队列的名字)/topic-----------session创建

Queue queue = session.createQueue("myQueue");

// 6 创建消费者-----------session创建

MessageConsumer consumer = session.createConsumer(queue);

// 7 接收消息----text格式

TextMessage receive = (TextMessage) consumer.receive();

String text = receive.getText();

System.out.println("接收到的消息====" + text);

// 8 提交事务

session.commit();

session.close();

connection.close();

}

4. 使用监听器的方式,从队列中消费消息

/**

*异步方式

Queue接受用Listener方式接受,多用

如果有多个监听listener,则交替执行

* @throws Exception

*/

@Test

public void receiverQueueListener() throws Exception{

// 1 创建连接工厂

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

// 2 使用工厂,创建连接

Connection connection = factory.createConnection();

// 3 启动连接

connection.start();

// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务//死循环的不能用事物

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5 创建队列队形(hello--队列的名字)/topic-----------session创建

Queue queue = session.createQueue("myQueue");

// 6 创建消费者-----------session创建

MessageConsumer consumer = session.createConsumer(queue);

//7 // 给消费者添加监听器

consumer.setMessageListener(new MessageListener() {

@Override

public void onMessage(Message msg) {

TextMessage message = (TextMessage) msg;

try {

System.out.println("Listener1111111111接收到的消息是=="+message.getText());

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

});

while(true){}

// 使用监听器的方式不能关闭,需要监听器一直工作

// session.commit();

// session.close();

// connection.close();

}

**ActiveMQ的消息订阅方式入门**(Pub/Sub模式)

/**

* Topic发送

* @throws JMSException

*/

@Test

public void sendTopicMessage() throws JMSException {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 创建消息订阅

Topic topic = session.createTopic("myTopic");

// 创建生产者

MessageProducer producer = session.createProducer(topic);

// 创建消息,一组可以存储key value的消息

MapMessage message = session.createMapMessage();

message.setString("username", "cgx");

message.setString("password", "123456");

// 发送消息

producer.send(message);

// 提交事务

session.commit();

session.close();

connection.close();

}

/**

* Topic接受

*

* @throws JMSException

*/

@Test

public void testReceiverMessage() throws JMSException {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 创建消息订阅

Topic topic = session.createTopic("myTopic");

// 创建消费者

MessageConsumer consumer = session.createConsumer(topic);

// 接收消息

MapMessage message = (MapMessage) consumer.receive();

System.out.println(message.getString("username"));

System.out.println(message.getString("password"));

session.commit();

session.close();

connection.close();

}

/**

* Topic接受Listener监听方式

*

* @throws Exception

*/

@Test

public void receiverQueueListener() throws Exception {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建消息订阅

Topic topic = session.createTopic("myTopic");

// 创建消费者

MessageConsumer consumer = session.createConsumer(topic);

// 给消费者添加监听器consumer添加监听

consumer.setMessageListener(new MessageListener() {

@Override

public void onMessage(Message msg) {

MapMessage message = (MapMessage) msg;

try {

System.out.println(message.getString("username"));

System.out.println(message.getString("password"));

} catch (JMSException e) {

e.printStackTrace();

}

}

});

while (true) {

}

}

### Spring整合ActiveMQ ###★★★★★

----------

**Spring整合ActiveMQ**★★★★★

1. 创建applicationContext-mq.xml的配置文件,导入约束★★★★★

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"

xmlns:amq="http://activemq.apache.org/schema/core"

xmlns:jms="http://www.springframework.org/schema/jms"

xsi:schemaLocation="

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd

http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd

http://www.springframework.org/schema/data/jpa

http://www.springframework.org/schema/data/jpa/spring-jpa.xsd

http://www.springframework.org/schema/jms

http://www.springframework.org/schema/jms/spring-jms.xsd

http://activemq.apache.org/schema/core

http://activemq.apache.org/schema/core/activemq-core.xsd">

</beans>

2. 具体的配置如下★★★★★

applicationContext-mq.xml===================mq的消息发送(消息生产者)

<!-- 配置连接工厂 -->

<!-- ActiveMQ 连接工厂 -->

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->

<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->

<amq:connectionFactory id="amqConnectionFactory"

brokerURL="tcp://localhost:61616" userName="admin" password="admin" />

<!-- Spring Caching连接工厂 -->

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->

<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>

<!-- Session缓存数量和链接数有关 -->

<property name="sessionCacheSize" value="100" />

</bean>

<!-- 定义JmsTemplate的Queue类型 -->★★★★★

<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">

<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

<constructor-arg ref="connectionFactory" />

<!-- 非pub/sub模型(发布/订阅),即队列模式 -->

<property name="pubSubDomain" value="false" />

</bean>

<!-- 定义JmsTemplate的Topic类型 -->★★★★★

<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate" >

<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

<constructor-arg ref="connectionFactory" />

<!-- pub/sub模型(发布/订阅) -->

<property name="pubSubDomain" value="true" />

</bean>

3. 发送消息的代码如下★★★★★

1.Queue方式:★★★★★

@Autowired

@Qualifier(value="jmsQueueTemplate")

private JmsTemplate queueTemplate;//Queue

/**

* Queue发送消息---spring框架

*/

@Test

public void sendQueueMessage() {

// 发送消息 构造参数指定目标,因为配置文件中的队列和订阅模式是通过id与false和true进行区分

queueTemplate.send("myQueue", new MessageCreator() {

@Override

public Message createMessage(Session session) throws JMSException {

// 使用session创建消息,发送

TextMessage textMessage = session.createTextMessage("测试结合spring框架发送queue消息");

return textMessage;

}

});

}

2.Topic方式:★★★★★

@Autowired

@Qualifier(value = "jmsTopicTemplate")

private JmsTemplate topicTemplate;//Topic

/**

* Topic发送消息---spring框架

*/

@Test

public void sendTopicMessage() {

topicTemplate.send("spring_topic", new MessageCreator() {

@Override

public Message createMessage(Session session) throws JMSException {

MapMessage mapMessage = session.createMapMessage();

mapMessage.setString("username", "mdzz");

return mapMessage;

}

});

}

4. 接收消息的代码如下==========不提倡手动,要用监听器异步获取

/**

* Queue接收消息---spring框架

* 同步手动:不提倡

* receive("myQueue")要写目标,不写目标的话会报找不到目标的错误NO defaultDestination

*/

@Test

public void receiverMessage() {

//接收消息textMessage类型

TextMessage textMessage = (TextMessage) queueTemplate.receive("myQueue");

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

**Spring配置监听器**★★★★★★★★★★★★★★★

1. 自定义监听器代码的编写----接收消息---spring框架---实现MessageListener接口★★★★★

1.Queue:★★★★★

@Component(value="queueConsumer1")

public class QueueListener implements MessageListener {

@Override

public void onMessage(Message arg0) {

// 把arg0强转

TextMessage textMessage = (TextMessage) arg0;

try {

// 输出消息

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

2.Topic:发送一个,两个都会接受★★★★★topic特点:有几个监听几个都会同时收到

@Component

public class TopicConsumer1 implements MessageListener {

@Override

public void onMessage(Message arg0) {

MapMessage mapMessage = (MapMessage) arg0;

try {

System.out.println("TopicConsumer1===="+mapMessage.getString("username"));

} catch (JMSException e) {

e.printStackTrace();

}

}

}

@Component

public class TopicConsumer2 implements MessageListener {

2. 编写配置文件

applicationContext-mq-consumer.xml=============mq的消息接受(负责监听接受消息)

<!-- 扫描包 -->

<context:component-scan base-package="com.itcast.jms.consumer" />

<!-- ActiveMQ 连接工厂 -->

<amq:connectionFactory id="amqConnectionFactory"

brokerURL="tcp://localhost:61616" userName="admin" password="admin" />

<!-- Spring Caching连接工厂 -->

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>

<property name="sessionCacheSize" value="100" />

</bean>

<!-- Spring JmsTemplate 的消息生产者 start-->

<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory">

<jms:listener destination="myQueue" ref="queueConsumer1"/>---------自定义监听★★★★★

</jms:listener-container>

<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory">

<jms:listener destination="spring_topic" ref="topicConsumer1"/>---------自定义监听★★★★★

<jms:listener destination="spring_topic" ref="topicConsumer2" />---------自定义监听★★★★★

</jms:listener-container>

3.不用启动项目,把spring配置文件applicationContext-mq-consumer.xml启动起来,可以用采用下面方法

新建一个test类,让他一直启动着,这样就一直加载spring的配置文件

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("classpath:applicationContext-mq-consumer.xml")

public class SpringQueueListenerTest {

@Test

public void test(){

while(true);

}

}

4.只要发送端(发送消息---spring框架)一启动,监听器就会监听到,就会输出:测试结合spring框架发送queue消息★★★★★

总结:

消息发送
1. 创建spring容器
2. 从容器中获取JMSTemplate对象,发送消息
3. 定义Destination
4. 使用JMSTemplate对象发送消息
消息接受
1. 创建一个类实现MessageListener 接口。业务处理在此类中实现。
2.在spring容器中配置DefaultMessageListenerContainer对象,引用MessageListener 实现类对象接收消息。

项目整合ActiveMQ:

1. 消息生产者整合ActiveMQ
消息生产者只需要发送消息
需要把JMSTemplate和Destination交给spring进行管理

/**===========================activeMQ消息发送========================================*/
// 发送消息!!!
this.send("save", item.getId());
}

@Autowired
private JmsTemplate jmsTemplate;

@Autowired
private Destination destination;

/**
* 此方法就是用来发送消息的
* 考虑:1、发送什么数据?2、我需要什么数据?
* 在消息中需要:1、消息的标识:save,delete,update;2、商品的ID
*/
private void send(final String type, final Long itemId) {
// TODO Auto-generated method stub
jmsTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
//创建消息体
TextMessage textMessage = new ActiveMQTextMessage();
//设置消息内容
Map<String, Object> map = new HashMap<>();
map.put("type", type);
map.put("itemId", itemId);
try {
ObjectMapper mapper = new ObjectMapper();
textMessage.setText(mapper.writeValueAsString(map));
} catch (Exception e) {
e.printStackTrace();
}
return textMessage;
}
});
}

/**===========================activeMQ消息发送========================================*/
2. 消息消费改造
在taotao-search-service添加
ItemMessageListener:
@Autowired
private SearchService searchService;

@Override
public void onMessage(Message message) {
//先判断此消息类型是否是TextMessage
if(message instanceof TextMessage){
//如果是,强转
TextMessage textMessage = (TextMessage)message;
try {
//获取消息:json
String json = textMessage.getText();
//杰克逊第三作用:直接解析json数据
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(json);
String type = jsonNode.get("type").asText();
Long itemId = jsonNode.get("itemId").asLong();
//根据解析出来的type,判断此type=save的时候我应该调用indexSearch方法
if("save".equals(type)){
searchService.indexItem(itemId);
}

} catch (Exception e) {
e.printStackTrace();
}
}

}

SearchServiceImpl:
@Override
public void indexItem(Long itemId) throws Exception {
Item item = this.itemMapper.selectByPrimaryKey(itemId);

SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", item.getId());
doc.addField("item_title", item.getTitle());
doc.addField("item_image", item.getImage());
doc.addField("item_cid", item.getCid());
doc.addField("item_price", item.getPrice());
doc.addField("item_status", item.getStatus());

this.cloudSolrServer.add(doc);

this.cloudSolrServer.commit();

}

原文地址:https://www.cnblogs.com/soul-wonder/p/8910220.html

时间: 2024-08-05 08:58:16

消息中间件--ActiveMQ&JMS消息服务的相关文章

三:JMS消息服务规范

一:JMS是什么?--->JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API--->用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.--->Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持.---> JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息.--->JMS(Java

【EJB四】——JMS消息服务之P2P和Pub/Sub

JMS:java消息服务,JMS客户端可以通过JMS服务进行异步消息传输.可以支持两种模型:P2P和Pub/Sub P2P 点对点消息传输模式,这种模式主要包括发送者,消息队列和接收者 特点: 1.每个消息只有一个消费者,一旦被接收(消费),此消息就不存在于消息队列中了. 2.发送者和接收者在时间上没有依赖性(当消息发送后,无论接收者有没有在接收,都不会影响消息进入消息队列) 3.接收者在成功接收消息之后,需要向队列应答成功. Pub/Sub 发布/订阅的模式.主要包括:发布者,主题,订阅者三部

JBoss EAP应用服务器部署方法和JBoss 开发JMS消息服务小例子

一.download JBoss-EAP-6.2.0GA: http://jbossas.jboss.org/downloads JBoss Enterprise Application Platform(JBoss EAP)作为Redhat公司的商业产品是一个更加关注企业级特性和稳定性的实用部署版为了与JBoss Application Server(JBoss AS) 为了使这两个产品有差异化避免用户混淆因此 RedHat公司在2013年4月将JBoss AS正式更名为WildFly改名后的

Intellij IDEA 创建消息驱动Bean - 接收JMS消息

除了同步方式的调用之外,有时还需要异步调用,用来处理不需要即时处理的信息,例如短信.邮件等,这需要使用EJB中的独特组件——消息驱动Bean(Message-Driven Bean,MDB),它提供了Java消息服务JMS(Java Messaging Service)的处理能力,由消息驱动Bean来处理JMS消息.JMS的消息由客户端程序产生,并被发布到服务器的消息队列,消息驱动Bean随之检索消息并执行其内容.这种事件或者数据的通信就称为异步形式,客户端或者服务端Bean都无须依赖对方的直接

JAVA消息服务JMS规范及原理详解

一.简介 JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持. JMS允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 二.常用术语介绍 在提到JMS时,我们通常会说到一些术语,解释如下: 消息

MQ消息队列(2)—— Java消息服务接口(JMS)

一.理解JMS   1.什么是JMS?         JMS即Java消息服务(Java Message Service)应用程序接口,API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合. JMS不是消息队列,更不是某种消息队列协议.JMS是Jav

Java消息服务-JMS

什么是JMS? JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持. 消息服务: 消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建.发送.读取消息等,用于支持应用程序开发.在Java中,当两个应用程序使用

JMS(java消息服务)整合Spring项目案例

转载自云栖社区 摘要: Sprng-jms消息服务小项目 所需的包: spring的基础包 spring-jms-xx包 spring-message–xx包 commons-collection-xx包 commons-pool2-xx包 aop切面的包: spring-aop,spring-aspect... Sprng-jms消息服务小项目 所需的包: spring的基础包 spring-jms-xx包 spring-message–xx包 commons-collection-xx包 c

JMS(java消息服务)学习一

一.JMS是个什么鬼 1.百度百科解释:JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持. 2.JMS是一组消息服务的api即接口规范即数据库的JDBC消息服务的JMS. 二.为什么要学习,使用JMS 1.在JAVA中,如果两个应用程序之间对各自都不了解,甚至这