ActiveMQ发布-订阅消息模式

一、订阅杂志
我们很多人都订过杂志,其过程很简单。只要告诉邮局我们所要订的杂志名、投递的地址,付了钱就OK。出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中。这样我们就可以看到每一期精彩的杂志了。

仔细思考一下订杂志的过程,我们会发现这样几个特点:
1、消费者订杂志不需要直接找出版社;
2、出版社只需要把杂志交给邮局;
3、邮局将杂志送达消费者。
邮局在整个过程中扮演了非常重要的中转作用,在出版社和消费者相互不需要知道对方的情况下,邮局完成了杂志的投递。

二、 发布-订阅消息模式
刚刚讲了订阅杂志,下面我们会讲传统调用模式演化到发布-订阅消息模式。

有些网站在注册用户成功后发一封激活邮件,用户收到邮件后点击激活链接后才能使用该网站。一般的做法是在注册用户业务逻辑中调用发送邮件的逻辑。这样用户业务就依赖于邮件业务。如果以后改为短信激活,注册用户业务逻辑就必须修改为调用发送短信的逻辑。如果要注册后给用户加点积分,再加一段逻辑。经过多次修改,我们发现很简单的注册用户业务已经越来越复杂,越来越难以维护。相信很多开发者都会有类似痛苦的经历。

即使用户业务实现中对其他业务是接口依赖,也避免不了业务变化带来的依赖影响。怎么办?解耦!将注册用户业务逻辑中注册成功后的处理剥离出来。

再回头看看“订阅杂志”,如果没有邮局,出版社就必须自己将杂志送达所有消费者。这种情形就和现在的注册用户业务一样。我们发现问题了,在用户业务和其他业务之间缺少了邮局所扮角色。

我们把邮局抽象成一个管理消息的地方,叫“消息管理器”。注册用户成功后发送一个消息给消息管理器,由消息管理器转发该消息给需要处理的业务。现在,用户业务只依赖于消息管理器了,它再也不会为了注册用户成功后的其他处理而烦恼。

注册用户的改造就是借鉴了“订阅杂志”这样原始的模式。我们再进一步抽象,用户业务就是消息的“生产者”,它将消息发布到消息管理器。邮件业务就是消息的“消费者”,它将收到的消息进行处理。邮局可以订阅很多种杂志,杂志都是通过某种编号来区分;消息管理器也可以管理多种消息,每种消息都会有一个“主题”来区分,消费者都是通过主题来订阅的。

发布-订阅消息模式已经呈现在我们面前,在这里,对于发布者来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:

示例:
1、Publish.java:消息发布者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Publish() {
        try {
            factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMessage() throws Exception {
        Destination destination = session.createTopic("Topic001");
        TextMessage msg = session.createTextMessage("我是消息内容...");
        producer.send(destination, msg);

        if(connection != null){
            connection.close();
        }
    }

    public static void main(String[] args) throws Exception {
        Publish publish= new Publish();
        publish.sendMessage();
    }
}

2、Subscriber1.java:消息订阅者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber1 {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;

    public Subscriber1() {
        try {
            factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receive() throws Exception {
        Destination topic = session.createTopic("Topic001") ;
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new Listener());
    }

    class Listener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage tm = (TextMessage) message;
                System.out.println("Subscriber1 Received message: " + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Subscriber1 subscriber = new Subscriber1();
        subscriber.receive();
    }
}

3、Subscriber2.java:消息订阅者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber2 {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;

    public Subscriber2() {
        try {
            factory =
                    new ActiveMQConnectionFactory("ljq", "ljq",
                            "failover:(tcp://192.168.1.101:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receive() throws Exception {
        Destination topic = session.createTopic("Topic001") ;
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new Listener());
    }

    class Listener implements MessageListener {
        public void onMessage(Message message) {
            System.out.println(message);
            try {
                TextMessage tm = (TextMessage) message;
                System.out.println("Subscriber2 Received message: " + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Subscriber2 subscriber = new Subscriber2();
        subscriber.receive();
    }
}
时间: 2024-10-18 01:27:53

ActiveMQ发布-订阅消息模式的相关文章

Kafka是分布式发布-订阅消息系统

https://www.biaodianfu.com/kafka.html Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转.传统的企业消息系统并不是非常适合大规模的数据处理.为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志

JMS发布/订阅消息传送例子

阅读目录 前言 在Tomcat中配置JNDI 在Web工厂中编写代码 参考资料 前言 基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下. 在Tomcat中配置JNDI 配置连接工厂和话题 <Resource name="topic/connectionFactory" auth="Container" ty

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

设计模式 - 发布-订阅者模式

1.发布-订阅者 设计模式 定义 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知 观察者模式和发布订阅模式区别 观察者模式是由具体目标(发布者/被观察者)调度的,而发布/订阅模式是由独立的调度中心进行调度,所以观察者模式的订阅者与发布者之间是存在依赖的,而发布/订阅模式则不会:可以说发布订阅模式是观察者模式进一步解耦,在实际中被大量运用的一种模式 ** 观察者模式 ** 1.定义/解析 目标和观察者是基类,目标提供维护观察者的一系列方法,观察者提供更

Vue发布-订阅者模式

1.vue响应原理: vue.js采用数据劫持结合发布-订阅者模式,通过Object.defineProperty()来劫持data中各个属性的setter.getter,在数据变动时,发布消息给订阅者,触发响应的监听回调. (setter和getter是对象的存储器属性,是一个函数,用来获取和设置值) 2.发布-订阅者模式的作用: 处理一对多的场景,应用于不同情况下的不同函数调用 优点:低耦合性,易于代码维护: 缺点:若订阅的消息未发生,需消耗一定的时间和内存. <!DOCTYPE html>

高吞吐量的分布式发布订阅消息系统Kafka--安装及测试

一.Kafka概述 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案.Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费. 二.Kafka相关

Kafka logo分布式发布订阅消息系统 Kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息. 支持Hadoop并行数据加载. 卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素.

发布/订阅消息传送模型

1.发布/订阅模型概览 发布/订阅(publish-and-subscribe)模型通常被简写为pub/sub模型.在这个模型中,消息生产者成为发布者(publisher),而消息消费者则称为订阅者(subscribe).在点对点模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题.发布/订阅模型最重要的特性如下: 消息通过一个称为主题的虚拟通道进行交换. 每条消息都会传送给称为订阅者的多个消息消费者.订阅者有许多类型,包括持久性.非持久性和动态性. 发布者通常不会知道.也

开源的.NET发布-订阅消息服务器Laharsub

Laharsub是一种开源的.NET发布-订阅消息服务器,用于实时的web应用程序,像聊天.在线写作.新闻或者股票交易更新等等. Laharsub是一种构建在三层架构之上的发布-订阅消息服务器: 前端--客户端,中间层--web服务,后端--带有发布-订阅功能和存储能力的系统. 客户端一般是浏览器,但是可以是所有已知能够做出HTTP请求的程序. 中间层是一种WCF的HTTP服务,它会从客户端接收消息,并向其发送消息,而后端会包含真正的与消息相关的逻辑. 客户端可以创建主题,并通过RESTful