ActiveMQ学习--002--Topic消息例子程序

一、非持久的Topic消息示例

注意 此种方式消费者只能接收到 消费者启动之后,发送者发送的消息。

发送者

package com.lhy.mq.helloworld;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class NoPersistenceTopicSender {

    public static void main(String[] args) throws Exception {

        //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616"
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "lhy","123456",
                //ActiveMQConnectionFactory.DEFAULT_USER,
                //ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://127.0.0.1:61616");

        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("NB-NB"); //队列名称

        MessageProducer producer = session.createProducer(null);//

        // 第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)
        //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("我是消息内容  -333- "+i);
            producer.send(destination, message);

            System.err.println("生产者发送消息:"+message.getText());
        }
        session.commit();

        if(connection != null){
            connection.close();
        }
    }

}

接收者

package com.lhy.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class NoPersitenceTopicReceiver {

    public static void main(String[] args) throws Exception {
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "lhy","123456",
                        "tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                connection.start();
                final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createTopic("NB-NB");

                MessageConsumer consumer = session.createConsumer(destination);

                Message message = consumer.receive();
                while(message != null){
                    TextMessage textMsg = (TextMessage)message;
                    System.err.println("消费消息:"+textMsg.getText());
                    //接收下一个消息
                    message = consumer.receive(1000L);
                }

                //提交一下事务,否则不确认消息,消息不会出队列
                session.commit();
                session.close();
                connection.close();
    }
}

二、持久订阅例子程序

发送者

package com.lhy.mq.helloworld;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PersistenceTopicSender {

    public static void main(String[] args) throws Exception {

        //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616"
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "lhy","123456",
                "tcp://127.0.0.1:61616");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Topic destination = session.createTopic("Persistence-Topic"); //队列名称
        MessageProducer producer = session.createProducer(null);//

        //默认为持久订阅,注意这个一定在start之前设置
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("我是消息内容  -666- "+i);
            producer.send(destination, message);

            System.err.println("生产者发送-topic-消息:"+message.getText());
        }
        session.commit();

        if(connection != null){
            connection.close();
        }
    }

}

消费者,可以有多个消费者

1, 消费者需要在Connection上设置消费者id,来识别消费者

2,需要创建TopicSubscriber 来订阅

3,设置好之后再start  这个Connection

4,一定要先运行一次消费者,来向ActiveMQ注册这个消费者,然后再运行发送消息,这样无论消费者是否在线,都会接收到消息。否则只能接收到注册之后的消息。

package com.lhy.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消费者需要先运行一次,向producer注册一下
 * @author dell
 *
 */
public class PersitenceTopicReceiver {

    public static void main(String[] args) throws Exception {
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "lhy","123456",
                        "tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                //设置消费者的id,向发送者先注册一下,producer就知道谁在订阅
                connection.setClientID("client2");

                final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Topic destination = session.createTopic("Persistence-Topic");
                TopicSubscriber consumer = session.createDurableSubscriber(destination, "T1");//创建一个持久订阅
                //最后start
                connection.start();

                Message message = consumer.receive();
                while(message != null){
                    TextMessage textMsg = (TextMessage)message;
                    System.err.println("消费消息:"+textMsg.getText());
                    //接收下一个消息
                    message = consumer.receive(1000L);
                }

                //提交一下事务,否则不确认消息,消息不会出队列
                session.commit();
                session.close();
                connection.close();
    }
}

分别修改消费者的clientID为 client1、client2运行,相当于2个消费者。

管控台:2个消费者,

原文地址:https://www.cnblogs.com/lihaoyang/p/8888545.html

时间: 2024-10-10 13:03:11

ActiveMQ学习--002--Topic消息例子程序的相关文章

ActiveMQ学习(四)——应用程序接口

在 Java 里有 JMS的多个实现.其中 apache 下的 ActiveMQ就是不错的选择. 用 ActiveMQ最好还是了解下 JMS JMS 公共 点对点域 发布/订阅域 ConnectionFactory QueueConnectionFactory TopicConnectionFactory Connection QueueConnection TopicConnection Destination Queue Topic Session QueueSession TopicSes

activeMQ学习(2)---------点对点、发布订阅的消息代码实现

以下是个人在学习activemq时从网上找到的资料, 总结留给自己以后复习 点对点的实现 2 @Test public void sendMessage(){ 3 try { 4 // 创建一个连接工厂 5 String url = "tcp://localhost:61616"; 6 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 7 // 设置用户名和密码,这个用户名

ActiveMQ学习笔记(五)——使用Spring JMS收发消息

ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试. 本例中,消息的收发都写在了一个工程里. 1.使用maven管理依赖包 <dependencies> <depend

ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性

Message Cursors ??ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下-批次待发送消息的位置,进行后续的发送操作.这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态. ??因此,从ActiveMQ5. 0. 0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费

RabbitMQ (消息队列)专题学习06 Topic

(使用Java客户端) 一.概述 在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接.并且可以有选择的接收消息. 尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息. 在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源. 这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而

ActiveMQ Topic消息重发

MQ学习系列: 消息队列概念与认知 ActiveMQ Topic消息重发 ActiveMQ Topic 消息重发 准备工作 windows下ActiveMQ的下载与启动 百度的教程:链接 ←这里包含基本的下载安装启动以及简单的配置账号 登录控制台主页:http://localhost:8161/admin/ 启动错误以及解决方案 activeMQ启动错误 BeanFactory not initialized https://blog.csdn.net/huang_sheng0527/artic

程序员如何学习书本上的例子

身为程序员,明白一件事:“看懂”书上的例子与实际动手做过有巨大差别,但是人的惰性会让自己懒得再去敲例子代码.这就形成了一个矛盾:一方面知道敲代码的重要性,另一方面却由于自感“理解了例子"而不想去敲,觉得即使敲了也只不过是复制了书本例子,没啥意义.问题的关键就在这里:如果你把书本上的例子原原本本地敲下来,确实是一点意义都没有,纯粹的复制而已.也许你会说,那就不要看着书的情况下敲下例子.我认为这种行为本质上还是复制,只不过披了一件”背诵"的外衣而已,毫无意义. 两全其美(不要理解为折中)的

ActiveMQ学习笔记(六)——JMS消息类型

1.前言 ActiveMQ学习笔记(四)--通过ActiveMQ收发消息http://my.oschina.net/xiaoxishan/blog/380446 和ActiveMQ学习笔记(五)--使用Spring JMS收发消息http://my.oschina.net/xiaoxishan/blog/381209   中,发送和接受的消息类型都是TextMessage,即文本消息(如下面的代码所示).显然消息类型只有文本类型是不能满足要求的. //发送文本消息  session.create

【ActiveMQ入门-5】ActiveMQ学习-消息持久性

ActiveMQ中的消息持久性     ActiveMQ很好的支持了消息的持久性(Persistence).消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据