MQ(队列消息的入门)

  消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成,通过提供消息传递消息排队模型,它可以在分布式环境下拓展进程间的通信,对于消息中间件,常见的角色大致也就有Producer(生产者).Consumer(消费者)

MQ     消息中间件     消息队列

Message Queue简称MQ

种类:

1.Apache  ActiveMQ  

  ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现.我们在本次课程中介绍ActiveMQ的使用.

2.阿里  RocketMQ

  

3.Pivotal 开发RabbitMQ

  AMQP协议的领导实现,支持多种场景.淘宝的Mysql集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用.

ZeroMQ

  史上最快的消息队列系统.

kafka:

  Apache下的一个子项目.特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统.适合处理海量数据.

使用场景(为什么使用MQ?):

//TODO

JMS简介

什么是JMS?

    JMS(Java Messaging Service)是java平台上有关面向消息中间件的技术规范(可以使用jmsTemplate),它便于消息系统中的java应用程序进行消息交换,并且通过提供标准的产生,发送,接收消息的接口库简化企业应用的开发.

  JMS本身只定义了一系列的接口规范,是一种与厂商无关的API.用来访问消息收发系统.它类似于JDBC(java Database Connectivity);这里,JDBC是可以用来访问许多不同关系数据库的的API.而JMS则提供同样与厂商无关的访问方法,以访问消息收发服务.许多厂商目前都支持JMS,包括IBM的MQseries.BEA的Weblogic.JMSservice和Progress的SonicMQ,这只是几个例子.JMS使您能够通过消息收发服务(有时称为消息中介程序或者路由器)从一个JMS客户机向另一个JMS客户机发送消息,消息是JMS中的一种类型对象,由两部分组成:报头消息主体.

报头由路由信息以及有关该消息的元数据组成.

消息主体则携带着应用程序的数据或有效负载.

  JMS定义了五种不同的消息正文格式,以及调用的消息类型.允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性.

  TextMessage ---一个字符串对象

  MapMessage---一套名称-值对

  ObjectMessage--一个序列化的java对象

  ByteMessage -- 一个字节的数据流

  StreamMessage --Java原始值的数据流

JMS消息传递类型

  对于消息的传递有两种类型:

    一种是点对点,即一个生产者和一个消费者一一对应.

    

    另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收.

JMS入门小Demo

现在是点对点模式:

  点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接行ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发现接收端,如果没有接收端接收,多个接收端,但是一条消息,只会被一个接收端给接收到,那个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息.

创建的是一个没有使用任何骨架的java工程

引入的依赖为:

   <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

代码

/**
 * @Auther:qingmu
 * @Description:脚踏实地,只为出人头地
 * @Date:Created in 16:16 2019/4/23
 */

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 点对点模式
 * 生产者
 */

public class QueueProducer {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        /**
         *  AUTO_ACKNOWLEDGE = 1    自动确认
         •    CLIENT_ACKNOWLEDGE = 2    客户端手动确认
         •    DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
         •    SESSION_TRANSACTED = 0    事务提交并确认

         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();

    }
}

直接运行这个main方法,后可以mq中查看到:

消费者的代码为:

/**
 * @Auther:qingmu
 * @Description:脚踏实地,只为出人头地
 * @Date:Created in 16:23 2019/4/23
 */

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 点对点模式
 * 消息消费者
 */
public class QueueConsumer {
    public static void main(String[] args) throws Exception{
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);

        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();

    }
}

通过上面定义的监听器,可以获取到生产者生产的信息.在控制台上

在进行测试的时候,开启两个以上的消费者,开启一个生产者,然后可以观察到只能在一个消费者的控制台上进行显示,而在另一个消费者的控制台上不能进行打印.

发布和订阅者模式

消费生产者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther:qingmu
 * @Description:脚踏实地,只为出人头地
 * @Date:Created in 16:32 2019/4/23
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception{
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的世界");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();

    }
}

运行完以后的结果为:

消费者为:

/**
 * @Auther:qingmu
 * @Description:脚踏实地,只为出人头地
 * @Date:Created in 16:36 2019/4/23
 */

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 订阅
 * 多对多
 */
public class TopicConsumer {
    public static void main(String[] args) throws Exception{
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        //Queue queue = session.createQueue("test-queue");
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);

        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();

    }
}

运行测试:

  同时开启2个以上的消费者,再次运行生产者,观察每一个消费者控制台的输出,会发现每个消费者会接收到消息.

原文地址:https://www.cnblogs.com/qingmuchuanqi48/p/10755340.html

时间: 2024-10-10 23:10:38

MQ(队列消息的入门)的相关文章

ASP.NET Core消息队列RabbitMQ基础入门实战演练

一.课程介绍 人生苦短,我用.NET Core!消息队列RabbitMQ大家相比都不陌生,本次分享课程阿笨将给大家分享一下在一般项目中99%都会用到的消息队列MQ的一个实战业务运用场景.本次分享课程不是零基础教学,课程内容的侧重点是讲解的RabbitMQ的最实用.最简单的实战运用场景:Publish/Subscrib(发布/订阅)模式,发送端发送消息,单个接收端接收处理消息. 学完本次"是猴子都看的懂的消息队列RabbitMQ实战课程"后,阿笨带直接让你也能如此优雅简单的上手使用Rab

MQ中将消息发送至远程队列的配置

MQ中将消息发送至远程队列的配置 摘自MQ资源管理器帮助文档V7 在开始学习本教程之前,您需要从系统管理员处了解标识网络上接收机器的名称:IP地址.MQ的端口号.队列管理器.接收(远程机器)或者是发送的队列(本地机器)名称. 消息传递如图示: 本教程介绍了如何设置一台计算机上的队列管理器 QM_ORANGE 与另一台计算机上的队列管理器 QM_APPLE 之间的消息传递.在第一台计算机上创建的消息被传递到第二台计算机上的队列 Q1(此队列被称为远程队列). 要点: 在本教程中,您将使用创建了队列

MQ队列管理器搭建(一)

多应用单MQ使用场景 如上图所示,MQ独立安装,或者与其中一个应用同处一机.Application1与Application2要进行通信,但因为跨系统,所以引入中间件来实现需求. Application1需要连接MQ,并将消息放入队列Queue中,Application2同样连接MQ,监听在Queue队列上,一旦发现有消息进入则取出该消息进行处理. 下面将给出创建队列管理器和队列的示例: 定义队列管理器名称为Qm1,本地队列名称为Queue,服务器连接通道CHAN_SERVER_CON,监听端口

MQ队列管理器搭建(三)

MQ集群及网关队列管理器的搭建 描述: 如上图所示,为MQ的集群搭建部署图.CLUSTERA.CLUSTERB分别是两个集群,其中Qm1-Qm3.GateWayA为CLUSTERA集群中的队列管理器:Qm1-Qm3.GateWayB是CLUSTERB集群中的队列管理器.GateWayA与GateWayB负责网络路由和消息分发,使用集群的方式可以达到负载均衡的目的,除此之外还能提高MQ使用的稳定性.同一个集群中除网关队列管理器外的任意队列管理器因故关闭或停止工作后,其他的队列管理器可以接管它的工作

MQ队列管理器搭建(二)

MQ级联方式使用场景 使用场景: 如上图所示,Application1与Application2要进行通信或者消息互换,使用MQ中间件作为中介.上图中,Application1与Application2通信不进行直接连接,而是通过与MQ通信从而实现二者的通信.图中两个MQ的信息如上描述.其中RemoteQueue为远程队列,该队列指定了目标端对应的队列为Queue,并且该远程队列指定了传输所使用的传输队列尾TransQueue:而此传输队列TransQueue与发送通道CHAN_QMGR1_TO

架构设计:系统间通信(20)——MQ:消息协议(下)

(接上文<架构设计:系统间通信(19)--MQ:消息协议(上)>) 上篇文章中我们重点讨论了"协议"的重要性,并为各位读者介绍了Stomp协议和XMPP协议.这两种协议是消息队列中两种不同使用场景下的典型代表.本文主要接续上文的篇幅,继续讨论消息队列中另一种典型协议:AMQP协议. 3-3.AMQP协议 AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议).目前AMQP协议的版本为 Version 1.0,这个协议标准

MQ队列管理

分享一段代码,很实用. 下面这段java代码是我在国外一个论坛上发现的,源地址已经忘了.代码的作用是可以删除正在使用的mq的队列消息,管理mq的人一定知道它的美妙了吧,哈哈. 我拿来改了下,增加了2个参数支持:ccsid和channel.上代码: 1 import java.util.Hashtable; 2 import com.ibm.mq.*; 3 4 /** 5 * A simply Java class to destructively read (delete) all messag

Java调用MQ队列

IBM MQ 6.0中设置两个队列,(远程队列.通道之类都不设置). 队列管理器是XIR_QM_1502 队列名称是ESBREQ IP地址是10.23.117.134(远程的一台电脑,跟我的电脑不在一个局域网内) 端口1414 CCSID 1208 MQ配置可以参考这个,有配图http://wenku.baidu.com/view/06d108d0360cba1aa811daa3.html 程序如下,发送线程两个,接收线程一个.接收完毕后就结束. [java] view plaincopy /*

Python查看MQ队列深度

分享一段代码,很简单但是也很实用. 1 #!/usr/bin/python 2 #-*- coding:gb18030 -*- 3 ''' 4 Usage: mq.py [Qmgr] 5 *get the queues' curdepth which type is local, 6 and sorted by curdepth desc. 7 Auth : [email protected] 8 ''' 9 10 import re 11 import os 12 import sys 13