消息中间件activeMQ

Activemq使用教程

解压activmq进入bin\win64 启动activemq.bat

启动成功

浏览器访问http://127.0.0.1:8161

创建maven工程

在pom.xml中添加依赖

 <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.8</version>
  </dependency>
</dependencies>

创建创建者和发布者

Producer 代码

package com.td.active;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class producer {
    public static void main(String[] args) throws JMSException {
        //1、创建工厂连接对象,需要指定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("test-queue");
        //6、使用会话对象创建生产者对象
        MessageProducer producer = session.createProducer(queue);
        //7、使用会话对象创建一个消息对象
        TextMessage textMessage = session.createTextMessage("hello!test-queue");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

consumer 代码

public class consumer {
    public static void main(String[] args) throws JMSException, IOException {
        //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("test-queue");
        //6、使用会话对象创建生产者对象
        MessageConsumer consumer = session.createConsumer(queue);
        //7、向consumer对象中设置一个messageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用户消息
        System.in.read();
        //9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

开启生产者生产消息

如果出现以下路径错误

生产者启动成功生成一条消息在浏览器中可以看到

启动消费者

ActiveMQ整合spring及项目中运用

activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
</dependency>

然后编写applicationContext-activemq.xml文件,

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring使用JMS工具类,可以用来发送和接收消息 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 话题目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
</beans>

在使用的类中注入模板来使用

@Autowired
private JmsTemplate jmsTemplate;

@Resource(name="itemAddTopic")
private Destination destination;

发送消息的示例

发送消息列表

public void  addUser(){
    //第一个参数目的地  可以是队列的名称spring-queue 也可以是ip
    //第二个参数是发送消息的对象
    jmsTemplate.send("spring-queue", new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage("要发送的消息");
        }
    });
}

发送主题

try {
    Topic topic = jmsTemplate.getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createTopic("item-add-topic");
    jmsTemplate.send(topic, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage("要发送的消息");
        }
    });
} catch (JMSException e) {
    e.printStackTrace();
}

消费者项目

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 话题目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
    <!-- 配置监听器 -->
    <bean id="myListener" class="com.td.active.MyListener"/>
    <bean id="itemAddListener" class="com.td.active.ItemAddListener"/>
    <!-- 配置系统监听器 消息列表 -->
    <!--    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="myListener"/>
        </bean> -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="itemAddTopic"/>
        <property name="messageListener" ref="itemAddListener"/>
    </bean>
</beans>

通过配置监听器实现接收消息

列表监听器

public class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {

        try {
            TextMessage testMessage = (TextMessage) message;
            String text = testMessage.getText();
            System.out.println("接收到消息 = " + text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

主题监听器

public class ItemAddListener implements MessageListener {
    @Override
    public void onMessage(Message message) {

        try {
            TextMessage testMessage = (TextMessage) message;
            String text = testMessage.getText();
            System.out.println("接收到消息 = " + text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

原文地址:https://www.cnblogs.com/lldsgj/p/10765129.html

时间: 2024-10-08 20:41:06

消息中间件activeMQ的相关文章

消息中间件ActiveMQ及Spring整合JMS的介绍

一 .消息中间件的基本介绍 1.1 消息中间件 1.1.1 什么是消息中间件 消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信.对于消息中间件,常见的角色大致也就有Producer(生产者).Consumer(消费者) 常见的消息中间件产品: (1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.

消息中间件activemq的使用场景介绍(结合springboot的示例)

一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景.本篇使用ActiveMQ+SpringBoot来模拟这四个场景. 2.

消息中间件--ActiveMQ&amp;JMS消息服务

### 消息中间件 ### ---------- **消息中间件** 1. 消息中间件的概述 2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景) * 异步处理 * 应用解耦 * 流量削峰 * 消息通信 ---------- ### JMS消息服务 ### ---------- **JMS的概述** 1. JMS消息服务的概述 2. JMS消息模型 * P2P模式 * Pub/Sub模式 3. 消息消费的方式 * 同步的方式---手动 * 异步的方式---listener监听 4.

学习dubbo(8):消息中间件activemq -简介

消息中间定义 消息中间件是在分布式系统中完成消息的发送和接收的基础软件 消息中间件的作用 消息中间件可利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成 .通过提供消息传递 和消息排队模型,可以在分布式环境下扩展进程间的通信. 通过 消息中间件,应用程序或组件之间可以进行可靠的异步通讯,从而降低系统之间的耦合度,提高系统的可扩展性和可用性 应用场景 通过使用消息中间件对Dubbo服务间的调用进行解耦 JMS消息模型 1.点对点或队列模型 JMS 点对点队列模型

消息中间件-ActiveMQ入门实例

1.下载ActiveMQ: http://activemq.apache.org/download-archives.html 2.运行ActiveMQ 解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\win64\activemq.bat运行ActiveMQ程序.注意:期间若遇到无法启动的情况,请尝试将计算机名改为全英文,不能有其他字符!启动以后可浏览器打开一下网址登录,以便后面查看后面程序运行以后的信息!包括消息队列

架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)

接上文<架构设计:系统间通信(21)--ActiveMQ的安装与使用> 3.ActiveMQ性能优化思路 上篇文章中的两节内容,主要介绍消息中间件ActiveMQ的安装和基本使用.从上篇文章给出的安装配置和示例代码来看,我们既没有修改ActivieMQ服务节点的任何配置,也没有采用任何的集群方案.这种情况只适合各位读者熟悉ActiveMQ的工作原理和基本操作,但是如果要将ActivieMQ应用在生产环境下,上文中介绍的运行方式远远没有挖掘出它的潜在性能. 根据这个系列文章所陈述的中心思想,系统

深入浅出JMS(三)--ActiveMQ简单的HelloWorld实例

第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点. 第二篇博文深入浅出JMS(二)–ActiveMQ简单介绍以及安装,我们介绍了消息中间件ActiveMQ,安装,启动,以及优缺点. 这篇博文,我们使用ActiveMQ为大家实现一种点对点的消息模型.如果你对点对点模型的认识较浅,可以看一下第一篇博文的介绍. JMS其实并没有想象的那么高大上,看完这篇博文之后,你就知

ActiveMQ从入门到精通(一)

这是关于消息中间件ActiveMQ的一个系列专题文章,将涵盖JMS.ActiveMQ的初步入门及API详细使用.两种经典的消息模式(PTP and Pub/Sub).与Spring整合.ActiveMQ集群.监控与配置优化等.话不多说,我们来一起瞧一瞧! JMS 首先来说较早以前,也就是没有JMS的那个时候,很多应用系统存在一些缺陷: 1.通信的同步性 client端发起调用后,必须等待server处理完成并返回结果后才能继续执行 2.client 和 server 的生命周期耦合太高 clie

JMS中间件--ActiveMQ

java系统之间的消息通讯使用最多的是基于RMI的RPC和基于JMS的RPC,这两种的消息传输方式虽然都能够起到通讯的作用,但是在笔者看来,二者之间的差别还是非常大的.首先RMI是同步传输,而JMS是异步传输,另外二者的使用场景也是大不相同.在系统集成平台这个项目中让我能够有机会更加深入的认识这两种消息通讯机制.基础系统与考试系统之间的数据传输我们采用的是将ejb发布成webservice,然后再通过esb将客户端和webservice进行连接,这种方式在本质上是ejb之间的相互调用,属于RMI