ActiveMQ之队列和主题发布订阅实例

JMS 消息模型

  JMS消息服务应用程序结构支持两种模型:点对点模型,发布者/订阅者模型。  

  (1)点对点模型(Queue)

    一个生产者向一个特定的队列发布消息,一个消费者从这个队列中依次读取消息。

    模型特点:只有一个消费者获得消息。

  (2)发布者/订阅者模型(Topic)

    0个或多个订阅者可以接受特定主题的消息。

    模型特点:多个消费者可获得消息。

    Topic和Queue的最大区别在于Topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而Queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

JMS消息格式

  • MapMessage -- key-value键值对
  • TextMessage -- 字符串对象
  • ObjcetMessage -- 一个序列化的Java对象
  • ByteMessage -- 一个未解释字节的数据流
  • StreamMessage -- Java原始值的数据流

增加maven依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.10.0</version>
</dependency>

点对点

生产者

package com.zns.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory factory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        // 消息创建者
        MessageProducer messageProducer;
        try {
            factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建队列(返回一个消息目的地)
            destination = session.createQueue("Queue1");
            // 创建消息生产者
            messageProducer = session.createProducer(destination);
            // 创建TextMessage消息实体
            TextMessage message = session.createTextMessage("hello,world!");
            messageProducer.send(message);
            session.commit();
            System.out.println("生产了消息...");
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者

package com.zns.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory connectionFactory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(消费者就不需要开启事务了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建队列(返回一个消息目的地)
            destination = session.createQueue("Queue1");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //注册消息监听
            consumer.setMessageListener(new MyListerner());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消息监听器

package com.zns.activemq.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyListerner implements MessageListener{
    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发布订阅

生产者

package com.zns.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory factory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        // 消息创建者
        MessageProducer messageProducer;
        try {
            factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建Topic
            destination = session.createTopic("Topic1");
            // 创建消息生产者
            messageProducer = session.createProducer(destination);
            // 创建TextMessage消息实体
            TextMessage message = session.createTextMessage("hello,world!");
            messageProducer.send(message);
            session.commit();
            System.out.println("生产了消息...");
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者

package com.zns.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory connectionFactory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(消费者就不需要开启事务了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建Topic
            destination = session.createTopic("Topic1");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //注册消息监听
            consumer.setMessageListener(new Listerner1());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package com.zns.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer2 {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory connectionFactory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(消费者就不需要开启事务了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建Topic
            destination = session.createTopic("Topic1");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //注册消息监听
            consumer.setMessageListener(new Listerner2());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消息监听器

package com.zns.activemq.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listerner1 implements MessageListener{
    public void onMessage(Message message) {
        try {
             System.out.println("订阅者1接收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package com.zns.activemq.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listerner2 implements MessageListener{
    public void onMessage(Message message) {
        try {
             System.out.println("订阅者2接收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

原文地址:https://www.cnblogs.com/zengnansheng/p/10389731.html

时间: 2024-11-09 02:20:52

ActiveMQ之队列和主题发布订阅实例的相关文章

JavaScript发布/订阅实例

原文链接: Pub/Sub JavaScript Object原文日期: 2014年6月11日翻译日期: 2014年6月13日 翻译人员: 铁锚 高效AJAX网站的三大杀器: 事件代理, 浏览历史管理, 以及高效应用级 发布/订阅通信机制. 本博客的原文站点 同时使用了这三种技术,本文中作者将分享其中最简单的一个: 该网站使用的 一个微型 发布/订阅模块. 如果你不了解 发布/订阅 模式,那么可以将其类比为 你发表了一篇博文,所有人都可以订阅你的博客, 也类似于广播电台的工作方式: 有一个站台进

ActiveMQ入门系列三:发布/订阅模式

在上一篇<ActiveMQ入门系列二:入门代码实例(点对点模式)>中提到了ActiveMQ中的两种模式:点对点模式(PTP)和发布/订阅模式(Pub & Sub),详细介绍了点对点模式并用代码实例进行说明,今天就介绍下发布/订阅模式. 一.理论基础 发布/订阅模式的工作示意图: 消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息. 和点对点方式不同,发布到topic的消息会被所有订阅者消费. 当生产者发布消息,不管是否有消费者,都不会保存消息. 一定要先

MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例

package com.etrip.push; import com.ibm.mqtt.MqttAdvancedCallback; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * Android推送方案分析(MQTT/XMPP/GCM) 方案1. 使用GCM服务(Google Cloud Messaging) 简介:Go

MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

使用IBM MQTTv3实现相关的发布订阅功能 MQTTv3的发布消息的实现: Java代码   package com.etrip.mqttv3; import com.ibm.micro.client.mqttv3.MqttClient; import com.ibm.micro.client.mqttv3.MqttDeliveryToken; import com.ibm.micro.client.mqttv3.MqttMessage; import com.ibm.micro.clien

ROS主题发布订阅

节点是一个可执行程序,它连接到了ROS的网络系统中.我们将会创建一个发布者,也就是说话者节点,它将会持续的广播一个信息. 改变目录到之前所建立的那个包下: cd ~/catkin_ws/src/beginner_tutorials 在beginner_tutorials包下面建立一个src文件夹: mkdir -p ~/catkin_ws/src/beginner_tutorials/src 创建文件src/talker.cpp: gedit src/talker.cpp 将下面的内容复制进去:

ROS主题发布订阅控制真实的机器人下位机

先模拟控制小乌龟 新建cmd_node.ccpp文件: #include"ros/ros.h" #include"geometry_msgs/Twist.h" //包含geometry_msgs::Twist消息头文件 #include <stdlib.h> #include<stdlib.h> int main(int argc, char **argv) { ros::init(argc, argv, "cmd_node&quo

rabbitMq及安装、fanout交换机-分发(发布/订阅)

<dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>3.6.5</version> </dependency> 最常见的几种消息通信模式主要有发布-订阅.点对点这两种http://blog.csdn.net/wooge

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件: 由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. ActiveMQ是用来干什么的? 用来处理消息,也就是处理JMS的.消息队列在大型电子商务类网站,如京东.淘宝.去哪儿等网站有着深入的应用, 队列的主要作用是消除高并发访问高峰,加快网站的响应速度. 在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力, 同时也使