ActiveMQ的(点对点&发布/订阅通信模式)和(持久化方式)

ActiveMQ的持久化

消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。
消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。

1、AMQ

ActiveMQ和Messages 通信方式 

上面提到JMS通信方式分为点对点通信和发布/订阅方式 1)点对点方式(point-to-point)    点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.  具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。  消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行 2)发布/订阅 方式(publish/subscriber Messaging)     发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。  一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

分别举例说明:

1)点对点:和前一篇文章一样

Sender类:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
private static final int SEND_NUMBER = 2000;
    public static void main(String[] args) {
       // ConnectionFactory :连接工厂,JMS用它创建连接
       ConnectionFactory connectionFactory;
       // Connection :JMS客户端到JMS Provider的连接
       Connection connection = null;
        // Session:一个发送或接收消息的线程
       Session session;
       // Destination :消息的目的地;消息发送给谁.
       Destination destination;
       // MessageProducer:消息发送者
       MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
       connectionFactory = new ActiveMQConnectionFactory(
              ActiveMQConnection.DEFAULT_USER,
              ActiveMQConnection.DEFAULT_PASSWORD,
              "tcp://localhost:61616");
       try{
           // 构造从工厂得到连接对象
           connection = connectionFactory.createConnection();
           //启动
           connection.start();
           //获取操作连接
           session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           //获取session,FirstQueue是一个服务器的queue                destination = session.createQueue("FirstQueue");
           // 得到消息生成者【发送者】
           producer = session.createProducer(destination);
           //设置不持久化
           producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
           //构造消息
           sendMessage(session, producer);
           //session.commit();
           connection.close();
       }
       catch(Exception e){
           e.printStackTrace();
       }finally{
           if(null != connection){
              try {
                  connection.close();
              } catch (JMSException e) {
                  // TODO Auto-generatedcatch block
                  e.printStackTrace();
              }
           }
       }
    }
    public static void sendMessage(Session session, MessageProducer producer)throws Exception{
       for(int i=1; i<=SEND_NUMBER; i++){
           TextMessage message = session.createTextMessage("ActiveMQ发送消息"+i);
           System.out.println("发送消息:ActiveMQ发送的消息"+i);
           producer.send(message);
       }
    }
} 

Receiver类:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
    public static void main(String[] args) {
       // ConnectionFactory :连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = newActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            //得到连接对象
            connection =connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            // 创建Queue
           destination = session.createQueue("FirstQueue");
            consumer =session.createConsumer(destination);
            while(true){
              //设置接收者接收消息的时间,为了便于测试,这里定为100s
              TextMessagemessage = (TextMessage)consumer.receive(100000);
              if(null != message){
                 System.out.println("收到消息" +message.getText());
              }else break;
            }
        }catch(Exception e){
        e.printStackTrace();
        }finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

运行进行测试:

时间: 2024-10-15 07:55:16

ActiveMQ的(点对点&发布/订阅通信模式)和(持久化方式)的相关文章

ActiveMQ发布-订阅消息模式

一.订阅杂志我们很多人都订过杂志,其过程很简单.只要告诉邮局我们所要订的杂志名.投递的地址,付了钱就OK.出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中.这样我们就可以看到每一期精彩的杂志了. 仔细思考一下订杂志的过程,我们会发现这样几个特点:1.消费者订杂志不需要直接找出版社:2.出版社只需要把杂志交给邮局:3.邮局将杂志送达消费者.邮局在整个过程中扮演了非常重要的中转作用,在出版社和消费者相互不需要知道对方的情况下,邮局完成了杂志的投递. 二. 发布-订阅消息模

设计模式 - 发布-订阅者模式

1.发布-订阅者 设计模式 定义 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知 观察者模式和发布订阅模式区别 观察者模式是由具体目标(发布者/被观察者)调度的,而发布/订阅模式是由独立的调度中心进行调度,所以观察者模式的订阅者与发布者之间是存在依赖的,而发布/订阅模式则不会:可以说发布订阅模式是观察者模式进一步解耦,在实际中被大量运用的一种模式 ** 观察者模式 ** 1.定义/解析 目标和观察者是基类,目标提供维护观察者的一系列方法,观察者提供更

Vue发布-订阅者模式

1.vue响应原理: vue.js采用数据劫持结合发布-订阅者模式,通过Object.defineProperty()来劫持data中各个属性的setter.getter,在数据变动时,发布消息给订阅者,触发响应的监听回调. (setter和getter是对象的存储器属性,是一个函数,用来获取和设置值) 2.发布-订阅者模式的作用: 处理一对多的场景,应用于不同情况下的不同函数调用 优点:低耦合性,易于代码维护: 缺点:若订阅的消息未发生,需消耗一定的时间和内存. <!DOCTYPE html>

JS 设计模式八 -- 发布订阅者模式

概念 发布---订阅模式又叫观察者模式,它定义了对象间的一种一对多(一个发布,多个观察)的关系,让多个观察者对象同时监听某一个主题对象,当一个对象发生改变时,所有依赖于它的对象都将得到通知. 优点 1.支持简单的广播通信,当对象状态发生改变时,会自动通知已经订阅过的对象. 2.发布者与订阅者耦合性降低 缺点 创建订阅者需要消耗一定的时间和内存. 如果过度使用的话,反而使代码不好理解及代码不好维护. 代码实现 var Event = (function(){ var list = {}, // 缓

学习javascript设计模式之发布-订阅(观察者)模式

1.发布-订阅模式又叫观察者模式,它定义对象之间一种一对多的依赖关系. 2.如何实现发布-订阅模式 2-1.首先指定好发布者 2-2.给发布者添加一个缓冲列表,用户存放回调函数以便通知订阅者 2-3.最后发布消息时候,发布者会遍历这个缓存列表,依次触发里面存放的订阅者回调函数 例子: var salesOffice = {};salesOffice.clientList = [];salesOffice.listen = function(key,fn){    if(!this.clientL

CRM中间件里的发布-订阅者模式

从事务码SMW01里能观察到一个BDOC可能被发送往不止一个目的site去,比如下图所示的5个site都会收到该site,而高亮显示的SMOF_ERPSITE代表ERP系统QI3的client 504会接收到这个BDOC. 所以上图列表里的site是从哪里读取出来的? 以BDOCPRODUCT_MAT为例,在视图SMW3FDBDOC里维护回调函数: 第一个回调SMOH_REPLICATION_WRAPPER_MSG负责决定需要从CRM将该BDOC发送到哪些site去. 这个回调是自动生成的: 在

javascript发布订阅pubsub模式

首先使用数组缓存订阅者订阅的消息,当订阅者订阅消息的时候,把订阅的消息push到指定消息的队列中,当发布者发布消息的时候,我们遍历执行push到指定消息队列中的回调事件. var Pubsub=(function(){ var eventObj={}; return { subscribe:function(event,fn){ eventObj[event]=fn }, publish:function(event){ if(eventObj[event]) eventObj[event]()

经典的发布订阅者模式

function Pubsub() { this.handlers = {};}Pubsub.prototype = { on: function (eventType, handler) { var self = this; if (!(eventType in self.handlers)) { self.handlers[eventType] = []; } self.handlers[eventType].push(handler) }, emit: function (eventTyp

设计模式-发布订阅模式(javaScript)

1. 前言 2. 什么是发布订阅模式 3. 发布订阅优缺点 4. 举例 4. 总结 1. 前言 发布订阅者模式是为了发布者和订阅者之间避免产生依赖关系,发布订阅者之间的订阅关系由一个中介列表来维护.发布者只需做好发布功能,至于订阅者是谁,订阅者做了什么事情,发布者是无需关心的 2. 什么是发布订阅模式 发布订阅:是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者).而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在.同样的,订阅者可以表达