基于JMS的消息传送

简单的介绍下基于JMS的消息传送

Java消息队列JMS整体设计结构

基本要素:生产者(producer),消费者(consumere),消息服务(broker)

交互模型:

JMS两种消息传送模式

点对点(Point-to-Point)专门用于使用队列Queue传送消息;

发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息

两种传送方式比较

  基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。(1 对 1)

  基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。(1 对 多)

Java消息服务JMS API总体概览

JMS API概览:

JMS API可以归于3个主要部分:

  公共API:可用于向一个队列或主题发送消息或从其中接收消息

  点对点API专门用于使用队列Queue传送消息

  发布/订阅API专门用于使用主题Topic传送消息

JMS公共API

  在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:

  • ConnectionFactory
  • Connection
  • Session
  • Message
  • Destination
  • MessageProducer
  • MessageConsumer

  它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建Message、MessageProducer和MessageConsumer。

 

JMS点对点API

  点对点消息传送模型API是指JMS API之内基于队列(Queue)的接口:

  • QueueConnectionFactory
  • QueueConnection
  • QueueSession
  • Message
  • Queue
  • QueueSender
  • QueueReceiver

  从接口的命名可以看出,大多数接口名称仅仅是在公共API接口名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。

JMS发布/订阅API

  发布/订阅消息传送模型API是指JMS API之内基于主题(Topic)的接口

  • TopicConnectionFactory
  • TopicConnection
  • TopicSession
  • Message
  • Topic
  • TopicPublisher
  • TopicSubscriber

 由于基于主题(Topic)的JMS API类似于基于队列(Queue)的API,因此在大多数情况下,Queue这个词会由Topic取代。

ActiveMQ点对点发送与接收消息示例

创建普通的maven项目activemq-java

在pom.xml文件中加入jms 和 activemq 的相关依赖

<dependencies>   
  <!-- JMS规范的jar依赖 -->
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0.1</version>
    </dependency>

    <!-- activeMQ对jms具体实现的jar依赖 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.15.8</version>
    </dependency>
    <!--slf4j的简单实现 可加可不加,解决输出警告日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.25</version>
        </dependency>
</dependencies>

在com.beijing.activemq.send包下编写一个消费发送者QueueSender发送消息

package com.beijing.activemq.send;
import com.beijing.activemq.model.User;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
public class QueueSender {
    public static final String BROKER_URL = "tcp://192.168.32.130:61616";
    //相当于一个数据库(其实是一个队列)
    public static final String DESTINATION = "myQueue";
    public static void main(String[] args) {
        sendMessage();
    }
    public static void sendMessage(){
       //创建一个连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = null;
        Session session = null;
        MessageProducer messageProducer = null;
        try {
            //获取一个连接
            connection = connectionFactory.createConnection();
            //创建一个Session第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认))
            //提示:消息确认机制的设置很关键,错误的设置容易导致数据丢失问题,建议手动确认,这里暂时定为自动确认
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            //有了session之后,就可以创建消息,目的地,生产者和消费者
            Message message = session.createTextMessage("Hello ActiveMQ");
            //目的地
            Destination destination = session.createQueue(DESTINATION);
            //生产者
            messageProducer = session.createProducer(destination);
            //发消息 没有返回值,是非阻塞的
            messageProducer.send(message);
        }catch (JMSException e ){
            e.printStackTrace();

        }finally {
            try {
                if (null != messageProducer) {
                    messageProducer.close();
                }
                if (null != session) {
                    session.close();
                }
                if (null == connection) {
                    connection.close();
                }
            }catch (JMSException e){
                e.printStackTrace();
            }
        }

    }
}

启动ActiveMQ,关闭防火墙

[root@localhost bin]# ./activemq start
[root@localhost bin]# systemctl stop firewalld

运行程序,连接ActiveMQ的web控制台查看

在com.beijing.activemq.receive包下编写一个消费接收者QueueReceiver接收消息

package com.beijing.activemq.receive;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

//接收点对点消息
public class QueueReceiver {
  public static final String BROKER_URL = "tcp://192.168.32.130:61616";
  //相当于一个数据库(其实是一个队列)
  public static final String DESTINATION = "myQueue";
  public static void main(String[] args) {
    receiveMessage();
  }
  private static void receiveMessage() {
  //创建一个连接工厂
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
    Connection connection = null;
    Session session = null;
    MessageConsumer messageConsumer = null;
    try {
  //获取一个连接
      connection = connectionFactory.createConnection();
      //接收消息,需要将连接启动一下,才可以接收到消息
      connection.start();
      //创建一个Session 第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认)
      session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
      //目的地
      Destination destination = session.createQueue(DESTINATION);
      //消费者
      messageConsumer = session.createConsumer(destination);
      //循环接收消息
      while (true){
        //接收消息 有返回值,值阻塞的
        Message message = messageConsumer.receive();
        //判断消息类型
        if(message instanceof TextMessage){
        String text = ((TextMessage) message).getText();
          System.out.println(text);
        }
      }
    }catch (JMSException e){
      e.printStackTrace();
    }finally {
      try {
        if (null != messageConsumer) {
          messageConsumer.close();
        }
        if (null != session) {
          session.close();
        }
        if (null == connection) {
          connection.close();
        }
      }catch (JMSException e){
        e.printStackTrace();
      }
    }
  }
}

 注意:接收方要调用connection的start方法才能接收到

运行接收者的代码,在ActiveMQ的web控制台观察消息数据

之前:

现在:

ActiveMQ发布与订阅示例

  发布订阅和点对点的代码基本相同,只是修改一下目的地,myQueue改为myTopic,以及在创建目的地的时候,将createQueue改为createTopic

直接在原有代码上更改

更改这两处地方
//相当于一个数据库(其实是一个队列)
public static final String DESTINATION = "myTopic";
//目的地
Destination destination = session.createTopic(DESTINATION);

消息订阅者先运行,然后再运行消息发布者

在ActiveMQ的web控制台观察消息数据

Queue与Topic比较

原文地址:https://www.cnblogs.com/really199/p/10485139.html

时间: 2024-12-11 11:02:47

基于JMS的消息传送的相关文章

发布/订阅消息传送模型

1.发布/订阅模型概览 发布/订阅(publish-and-subscribe)模型通常被简写为pub/sub模型.在这个模型中,消息生产者成为发布者(publisher),而消息消费者则称为订阅者(subscribe).在点对点模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题.发布/订阅模型最重要的特性如下: 消息通过一个称为主题的虚拟通道进行交换. 每条消息都会传送给称为订阅者的多个消息消费者.订阅者有许多类型,包括持久性.非持久性和动态性. 发布者通常不会知道.也

JMS消息服务器(二)——点对点消息传送模型

一.点对点模型概览 当你只需要将消息发布送给唯一的一个消息消费者是,就应该使用点对点模型.虽然可能或有多个消费者在队列中侦听统一消息,但是,只有一个且仅有一个消费者线程会接受到该消息. 在p2p模型中,生产者称为发送者,而消费者则称为接受者.点对点模型最重要的特性如下: 消息通过称为队列的一个虚拟通道来进行交换.队列是生产者发送消息的目的地和接受者消费消息的消息源. 每条消息通仅会传送给一个接受者.可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费. 消息存在先

JMS发布/订阅消息传送例子

阅读目录 前言 在Tomcat中配置JNDI 在Web工厂中编写代码 参考资料 前言 基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下. 在Tomcat中配置JNDI 配置连接工厂和话题 <Resource name="topic/connectionFactory" auth="Container" ty

JMS学习六(ActiveMQ消息传送模型)

ActiveMQ 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布 /订阅模型),分别称作:PTP Domain 和Pub/Sub Domain. 一.PTP消息传送模型 1.PTP(使用Queue 即队列目标) 消息从一个生产者传送至一个消费者.在此传送模型中,目标是一个队列.消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息.可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至.并

JMS(Java消息服务)入门教程

阅读目录 什么是Java消息服务 为什么需要JMS JMS的优势 JMS消息传送模型 接收消息 JMS编程接口 JMS消息结构 JMS使用示例 译文链接(做了部分修改~~) 什么是Java消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建.发送.读取消息等,用于支持JAVA应用程序开发.在J2EE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果,我

点对点消息传送模型

点对点(p2p)消息传送模型 在p2p模型中,生产者成为发送者,而消费者成为接收者.点对点最重要的特性如下: 消息通过成为队列的一个虚拟通道来进行交换. 每条消息仅会传送给一个接收者.可能会有多个接收者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接收者所消费. 消息存在先后顺序.一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者.当消息已被消费时,就会从队列中将它们删除.(除非使用了消息优先级外) 生产者和消费者之间没有耦合.接收者和发送者可以在运行时动态添加,这使得

JMS与消息队列

JMS,Java Message Service,即Java消息服务. MOM,Message Oriented Miiddleware的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成. JMS是Java的一套API标准,最初的目的是为了使应用程序能够访问现有的MOM系统,后来被许多现有的MOM供应商采用,并实现为MOM系统. 常见的MOM系统有Apache的ActiveMQ.BEA的RabbitMQ.阿里巴巴的RocketMQ.IBM的M

ActiveMQ消息传送模型

无论采用哪种JMS 组件,JMS 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布/订阅模型),分别称作:PTP Domain 和Pub/Sub Domain. PTP(使用Queue即队列目标) 消息从一个生产者传送至一个消费者.在此传送模型中,目标是一个队列.消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息.可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至.并由一个消费者成功

自定义基于xmemcached协议消息队列的Spark Streaming 接收器

虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的.对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可.而Receiver的实现只需要简单地实现两个方法: 1.onStart():接收数据. 2.onStop():停止接收数据. 一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收.而onStop()方法负责确保这些接收数据的线程是停止的,在 Receiver 被关闭时调用了