ActiveMQ使用示例之Topic

非持久的Topic消息示例 

对于非持久的Topic消息的发送基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创建队列替换成创建Topic,例如:

Destination destination = session.createTopic("MyTopic");

对于非持久的Topic消息的接收
1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
2:同样把创建Destination的地方,由创建队列替换成创建Topic,例如:
Destination destination = session.createTopic("MyTopic");
3:由于不知道客户端发送多少信息,因此改成while循环的方式了,例如:

Message message = consumer.receive();
while(message!=null) {
  TextMessage txtMsg = (TextMessage)message;
  System.out.println("收到消 息:" + txtMsg.getText());
  message = consumer.receive(1000L);
} 

生产者代码:

public class NoPersistenceSender {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME, NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
            destination = session.createTopic("MyTopic");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息
            messageProducer.send(message);
        }

    }
}

消费者代码:

public class NoPersistenceReceiver {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME, NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取
            destination = session.createTopic("MyTopic");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            Message message = messageConsumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消息:" + txtMsg.getText());
                message = messageConsumer.receive(1000L);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

如果运行生产者的时候没有启动消费者,也就是先运行生产者后运行消费者,那么运行效果是这样的

消费者阻塞

查看一下控制台

队列中有消息,但是无法消费~

在消费者运行的情况下再运行生产者

看下控制台

持久的Topic消息示例

生产者:

1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
2:一定要设置完成后,再start 这个 connection

public class PersistenceSender {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME, PersistenceSender.PASSWORD, PersistenceSender.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
            destination = session.createTopic("MyTopic");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //启动连接
            connection.start();
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < PersistenceSender.SENDNUM; i++) {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息
            messageProducer.send(message);
        }

    }
}

消费者:

1:需要在连接上设置消费者id,用来识别消费者

2:需要创建TopicSubscriber来订阅

3:要设置好了过后再start 这个connection

4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。

public class PersistenceReceiver {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Topic topic;//消息的目的地

        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME, PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            connection.setClientID("winner_0715");
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取
            topic = session.createTopic("MyTopic");
            //创建消息消费者
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1");

            //启动连接
            connection.start();

            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                //没这句有错
                message = consumer.receive(1000L);
            }
            session.commit();
            session.close();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}

控制台:

关于持久化和非持久化消息

持久化消息
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成 功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。 有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法为每一条消息设置传送模式

时间: 2024-09-28 19:26:16

ActiveMQ使用示例之Topic的相关文章

解决Springboot整合ActiveMQ发送和接收topic消息的问题

环境搭建 1.创建maven项目(jar) 2.pom.xml添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <dependencies> &l

【ActiveMQ入门-5】ActiveMQ学习-Queue与Topic的比较

Queue与Topic的比较 1.JMS Queue执行load balancer语义: 一条消息仅能被一个consumer收到. 如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用. 如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿. 一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡. 如何理解:如果一个consumer收到一条m

【分布式系列之ActiveMq】ActiveMq入门示例

前言 github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01 下载ActiveMQ :http://activemq.apache.org/download.html 放到自己的目录,大致目录如下: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志文件 docs存放的是说明文档 examples存放的是简单的实例 lib存放的是activemq所需jar包 webapps用于存放项目的

ActiveMQ简单示例

我们使用的是ActiveMQ 5.11.1 Release的Windows版,官网最新版是ActiveMQ 5.12.0 Release,大家可以自行下载,下载地址. 需要注意的是,开发时候,要将apache-activemq-5.11.1-bin.zip解压缩后里面的activemq-all-5.11.1.jar包加入到classpath下面,这个包包含了所有jms接口api的实现. 1.建立一个java项目,导入jar包. 2.编写代码 生产者代码: package com.tgb.acti

Java连接ActiveMQ代码示例(Producer和Consumer)

import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JMSProducer { //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private sta

activeMQ中queue 与 topic 区别

1.queue是点对点模式,一条消息对应一个消费者,topic是一对多模式,一条消息可能有一个或多个消费者 2.queue模式消息再发送后消费者可以在之后的任意时间消费,topic模式如果没有订阅者消息就是废消息,会被丢弃. 3.queue模式生产者与消费者之间没有时间相关性,topic模式下生产者和消费者之间有一定的时间相关性,消费者只能接收到订阅之后的生产者发送的消息. 原文地址:https://www.cnblogs.com/wl889490/p/12663236.html

ActiveMQ简述

概述 ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范.JMS是一组Java应用程序接口,它提供消息的创建.发送.读取等一系列服务.JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信. JMS支持两种消息发送和接收模型.一种称为P2P(

ActiveMQ消息系统研究与学习

概述 ActiveMQ是Apache所提供的一个开源的消息系统,完全采用 Java 来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范.JMS是一组Java应用程序接口,它提供消息的创建.发送.读取等一系列服务.JMS提供了一组公共应用程序接口和响应的语法,类似于Java 数据库 的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信. JMS支持两种消息发送和接收模型.一种称为

activemq 实战 一

This chapter covers ? Introduction to the use case for each of the book examples ? Use of Maven for compiling and running the examples ? How to use the example applications with ActiveMQ 本章内容包括: 介绍本书中所有实例的用例 使用Maven编译和运行书中实例 如何使用ActiveMQ的示例应用程序 Activ