JMS-mq-发布/订阅

1,Tomcat配置

  <Resource name="topic/connectionFactory" auth="Container"
        type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"
        factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"
        brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" />

    <Resource name="topic/topic0"
        auth="Container"
        type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        physicalName="TestTopic" />        

2,发布/订阅

发布:http://localhost:8080/Mq/Publisher

Publisher send:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:WWH-PC-64244-1488884593844-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://TestTopic, transactionId = null, expiration = 0, timestamp = 1488884667978, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = 2017?3?7?}

订阅:http://localhost:8080/Mq/Subscriber

Subscriber:ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:WWH-PC-64244-1488884593844-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WWH-PC-64244-1488884593844-1:3:1:1, destination = topic://TestTopic, transactionId = null, expiration = 0, timestamp = 1488884667978, arrival = 0, brokerInTime = 1488884667978, brokerOutTime = 1488884667982, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [email protected], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2017?3?7?}

3,代码

【1】消息发布

package com.ma.publish;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/Publisher")
public class Publisher extends HttpServlet{

    /**
     * 消息-订阅/发布
     */
    private static final long serialVersionUID = -4470119218802259551L;

    public Publisher(){
        super();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

        PrintWriter out = resp.getWriter();
        try{

            InitialContext context = new InitialContext();
            Topic topic = (Topic) context.lookup("java:comp/env/topic/topic0");
            TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) context.lookup("java:comp/env/topic/connectionFactory");

            TopicConnection tConnection = tConnectionFactory.createTopicConnection();
            TopicSession topicSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            TopicPublisher tpPublisher = topicSession.createPublisher(topic);
            tpPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            TextMessage txMessage = topicSession.createTextMessage();
            txMessage.setText("2017年3月7日");
            tpPublisher.publish(txMessage);

            out.write("Publisher send:" +txMessage);
            tConnection.close();

        }catch(Throwable e){
            e.printStackTrace();
        }

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    }

}

【2】消息订阅

package com.ma.publish;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/Subscriber")
public class Subscriber extends HttpServlet {

    /**
     * 消息-发布/订阅
     */
    private static final long serialVersionUID = 6058649540492572496L;

    public Subscriber(){
        super();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

        PrintWriter out = resp.getWriter();
        try{

            InitialContext context = new InitialContext();
            Topic topic = (Topic) context.lookup("java:comp/env/topic/topic0");
            TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) context.lookup("java:comp/env/topic/connectionFactory");

            TopicConnection tConnection = tConnectionFactory.createTopicConnection();
            TopicSession topicSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            TopicSubscriber tpSubscriber = topicSession.createSubscriber(topic);

            tConnection.start();
            TextMessage txMessage = (TextMessage) tpSubscriber.receive();

            out.write("Subscriber:" +txMessage);
            tConnection.close();

        }catch(Throwable e){
            e.printStackTrace();
        }

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

    }

}
时间: 2024-10-09 22:34:35

JMS-mq-发布/订阅的相关文章

JMS发布/订阅消息传送例子

阅读目录 前言 在Tomcat中配置JNDI 在Web工厂中编写代码 参考资料 前言 基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下. 在Tomcat中配置JNDI 配置连接工厂和话题 <Resource name="topic/connectionFactory" auth="Container" ty

3,ActiveMQ-入门(基于JMS发布订阅模型)

一.Pub/Sub-发布/订阅消息传递模型 在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端.在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic.topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端. 发布订阅模型就像订阅报纸.我们可以选择一份或者多份报纸,比如:北京日报.人民日报.这些报纸就相当于发布订阅模型中的topic.如果有很多人订阅了相同的报纸,那我们就在同一个topic中注册,对于报纸发行方,它就和

ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)

ActiveMQ简单简绍 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBMWEBSPHERE MQ. MQ特点: M

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

ActiveMQ发布-订阅消息模式

一.订阅杂志我们很多人都订过杂志,其过程很简单.只要告诉邮局我们所要订的杂志名.投递的地址,付了钱就OK.出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中.这样我们就可以看到每一期精彩的杂志了. 仔细思考一下订杂志的过程,我们会发现这样几个特点:1.消费者订杂志不需要直接找出版社:2.出版社只需要把杂志交给邮局:3.邮局将杂志送达消费者.邮局在整个过程中扮演了非常重要的中转作用,在出版社和消费者相互不需要知道对方的情况下,邮局完成了杂志的投递. 二. 发布-订阅消息模

发布/订阅消息传送模型

1.发布/订阅模型概览 发布/订阅(publish-and-subscribe)模型通常被简写为pub/sub模型.在这个模型中,消息生产者成为发布者(publisher),而消息消费者则称为订阅者(subscribe).在点对点模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题.发布/订阅模型最重要的特性如下: 消息通过一个称为主题的虚拟通道进行交换. 每条消息都会传送给称为订阅者的多个消息消费者.订阅者有许多类型,包括持久性.非持久性和动态性. 发布者通常不会知道.也

NetMQ发布订阅C#示例

NetMQ (ZeroMQ to .Net),ØMQ号称史上最快中间件.它对socket通信进行了封装,使得我们不需要写socket函数调用就能完成复杂的网络通信.和一般意义上的消息队列产品不同的是,它没有消息队列服务器,而更像是一个网络通信库.从网络通信的角度看,它处于会话层之上,应用层之下.[ZeroMQ 官网]:http://zeromq.org ØMQ有4个基本通信模型:分别是一对一结对模型(Exclusive-Pair).请求回应模型(Request-Reply).发布订阅模型(Pub

消息队列中点对点与发布订阅区别(转)

背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. 点对点与发布订阅最初是由JMS定义的.这两种模式主要区别或解决的问题就是发送到队列的消息能否重

ActiveMQ的(点对点&amp;发布/订阅通信模式)和(持久化方式)

ActiveMQ的持久化 消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消