ActiveMQ 发送和就收消息

一、添加 jar 包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
</dependency>

二、消息传递的两种形式

  1、点对点:发送的消息只能被一个消费者接收,第一个消费者接收后,消息没了

  2、发布/订阅:消息可以被多个消费者接收 。发完消息,如果没有消费者接收,这消息会自动消失。也就是说,消费者服务必须是启动的状态。( topic 消息在 ActiveMQ 服务端默认不是持久化的,可以通过配置文件配置持久化 )

三、点对点发送消息

/**
 * 点到点形式发送消息
 * @throws Exception
 */
@Test
public void testQueueProducer() throws Exception{
    //1、创建一个连接工厂,需要指定服务的 ip 和端口
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、使用工厂对象创建一个 Connection 对象
    Connection connection = connectionFactory.createConnection();
    //3、开启连接,调用 Connection 对象的 start 方法
    connection.start();
    //4、创建一个 Session 对象。
        //第一个参数:是否开启事务(一般不开启,如果开启事务,第二个参数没意义);
        //第二个参数:应答模式。自动应答或者手动应答,一般是自动应答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、使用 Session 对象创建一个 Destination 对象。两种形式 queue、topic。
    Queue queue = session.createQueue("test-queue");
    //6、使用 Session 对象创建一个 Producer 对象
    MessageProducer producer = session.createProducer(queue);
    //7、创建一个 Message 对象,可以使用 TextMessage。下面两种方式都可以
    /*TextMessage textMessage = new ActiveMQTextMessage();
    textMessage.setText("hello ActiveMQ");*/
    TextMessage textMessage = session.createTextMessage("hello ActiveMQ");
    //8、发布消息
    producer.send(textMessage);
    //9、关闭资源
    producer.close();
    session.close();
    connection.close();
}

四、点对点接收消息

/**
 * 点对点接收消息
 * @throws Exception
 */
@Test
public void testQueueConsumer() throws Exception{
    //1、创建一个 ConnectionFactory 对象连接 MQ 服务器
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、创建一个连接对象
    Connection connection = connectionFactory.createConnection();
    //3、开启连接
    connection.start();
    //4、使用 Connection 对象 创建一个 Session 对象
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、创建一个 Destination 对象。queue 对象
    Queue queue = session.createQueue("test-queue");
    //6、使用 Session 对象创建一个消费者
    MessageConsumer consumer = session.createConsumer(queue);
    //7、接收消息
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            //8、打印结果
            TextMessage textMessage = (TextMessage) message;

            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    });

    //9、等待接收消息。( 接收到消息后才网下面执行。关闭资源 )
    System.in.read();
    //10、关闭资源
    consumer.close();
    session.close();
    connection.close();

}

 五、广播发送消息

/**
 * 广播发送消息
 * @throws Exception
 */
@Test
public void testTopicProducer() throws Exception{
    //1、创建一个连接工厂,需要指定服务的 ip 和端口
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、使用工厂对象创建一个 Connection 对象
    Connection connection = connectionFactory.createConnection();
    //3、开启连接,调用 Connection 对象的 start 方法
    connection.start();
    //4、创建一个 Session 对象。
        //第一个参数:是否开启事务(一般不开启,如果开启事务,第二个参数没意义);
        //第二个参数:应答模式。自动应答或者手动应答,一般是自动应答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、使用 Session 对象创建一个 Destination 对象。两种形式 queue、topic。
    Topic topic = session.createTopic("test-topic");
    //6、使用 Session 对象创建一个 Producer 对象
    MessageProducer producer = session.createProducer(topic);
    //7、创建一个 Message 对象,可以使用 TextMessage。下面两种方式都可以
    /*TextMessage textMessage = new ActiveMQTextMessage();
    textMessage.setText("hello ActiveMQ");*/
    TextMessage textMessage = session.createTextMessage("hello ActiveMQ");
    //8、发布消息
    producer.send(textMessage);
    //9、关闭资源
    producer.close();
    session.close();
    connection.close();
}

六、广播接收消息

/**
 * 广播接收消息
 * @throws Exception
 */
@Test
public void testTopicConsumer() throws Exception{
    //1、创建一个 ConnectionFactory 对象连接 MQ 服务器
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、创建一个连接对象
    Connection connection = connectionFactory.createConnection();
    //3、开启连接
    connection.start();
    //4、使用 Connection 对象 创建一个 Session 对象
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、创建一个 Destination 对象。Topic 对象
    Topic topic = session.createTopic("test-topic");
    //6、使用 Session 对象创建一个消费者
    MessageConsumer consumer = session.createConsumer(topic);
    //7、接收消息
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            //8、打印结果
            TextMessage textMessage = (TextMessage) message;

            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    });
    System.out.println("topic消费者");
    //9、等待接收消息。( 接收到消息后才网下面执行。关闭资源 )
    System.in.read();
    //10、关闭资源
    consumer.close();
    session.close();
    connection.close();
}

原文地址:https://www.cnblogs.com/fangwu/p/8669036.html

时间: 2024-09-29 08:15:42

ActiveMQ 发送和就收消息的相关文章

解决Springboot整合ActiveMQ发送和接收topic消息的问题

环境搭建 1.创建maven项目(jar) 2.pom.xml添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <dependencies> &l

ActiveMQ发消息和收消息

来自:http://blog.163.com/chengwei_1104/blog/static/53645274201382315625329/ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 下面详细的解释常用类的作用 ConnectionFactory 接口(连接工厂) 用

ActiveMQ学习笔记(六)——JMS消息类型

1.前言 ActiveMQ学习笔记(四)--通过ActiveMQ收发消息http://my.oschina.net/xiaoxishan/blog/380446 和ActiveMQ学习笔记(五)--使用Spring JMS收发消息http://my.oschina.net/xiaoxishan/blog/381209   中,发送和接受的消息类型都是TextMessage,即文本消息(如下面的代码所示).显然消息类型只有文本类型是不能满足要求的. //发送文本消息  session.create

ActiveMQ持久化到mysql实现消息永不丢失

ActiveMQ持久化到mysql实现消息永不丢失 配置 1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml 2.打开activemq-jdbc-performance.xml,在persistenceAdapter节点后面添加dataSource="#mysql-ds" 并配置你的数据库 其实可以直接更改apache-activemq-5.15.2/conf/activemq.xml的per

Spring整合ActiveMQ及多个Queue消息监听的配置

消息队列(MQ)越来越火,在java开发的项目也属于比较常见的技术,MQ的相关使用也成java开发人员必备的技能.笔者公司采用的MQ是ActiveMQ,且消息都是用的点对点的模式.本文记录了实现Spring整合ActivateMQ的全过程及如何使用MQ,便于后续查阅. 一.项目的搭建 采用maven构建项目,免去了copy jar包的麻烦.因此,我们创建了一个java类型的Maven Project (1)项目结构图 先把项目结构图看一下,便于对项目的理解. (2)pom.xml 我们需要加入以

java 发送微信客服消息

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package com.baosight.wechat.service; import net.sf.json.JSONObject;

你知道吗,msg也能发送多行的消息

熟悉命令行的人都知道msg消息发送工具,但它之能发送单行的信息,但用一些小技巧我们就可以让它一次发送多行的消息,譬如在消息中间加入批处理换行符,下面是参考代码: @echo off setlocal enabledelayedexpansion set title=批处理换行符命令脚本 title %title% echo %title% echo. rem 下面的空行不能省略 set p=!^ ! msg %username% /w "这里是 第1行!p!这里是 第2行!p!!p!这里是 第4

Java Socket发送与接收HTTP消息简单实现

在上次Java Socket现实简单的HTTP服务我 们实现了简单的HTTP服务,它可以用来模拟HTTP服务,用它可以截获HTTP请求的原始码流,让我们很清楚的了解到我们向服务发的HTTP消息的结 构,对HTTP请求消息有个清晰的认识.这一节我想写了一个客户的程序,就是用来模拟浏览器,用来向服务器发送HTTP请求,最得要的是可以用它来显示服 务器发回来的HTTP响应消息的一般结构. [java] view plaincopy import java.io.IOException; import 

Qt5.11.2 VS2015编译activemq发送程序 _ITERATOR_DEBUG_LEVEL错误和崩溃解决

1.问题描述: 运行环境是 win10 64位系统,开发环境是VS2015 ,Qt 5.11.2.开发activemq发送程序,遇到问题 (1)Qt5AxContainer.lib error LNK2038: 检测到“_ITERATOR_DEBUG_LEVEL”的不匹配项: 值“0”不匹配值“1” Qt5AxBase.lib error LNK2038: 检测到“_ITERATOR_DEBUG_LEVEL”的不匹配项: 值“0”不匹配值“1” 问题分析:使用activemq-cpp.dll之前