1,生活中的案例[生产中的问题]为什么要使用MQ

为什么要使用MQ

微服务架构后,链式调用是我们在写程序时候的一般流程,为了这完成一个整体功能会把它拆分成多个函数(或子模块)比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但是大型分布式应用中,系统间的RPC交互复杂,一个功能后面要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构,这样的架构有没有问题呢?有

根据上面的风个问题,在设置系统时可以明确要克到的目标

1,要做到系统解耦,当新的模块进来时,可以做到代码改动最小;  能够解耦

2,设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰

3,强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

3,什么是MQ

3.1,定义

面向消息的中间件(message-oriented middleware0) MOM能够很好的解决以上的问题。

是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等

大致流程

发送者把消息发给消息服务器,消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服务器会把消息转发给接受者。

在这个过程中,发送和接受是异步的,也就是发送无需等待,发送者和接受者的生命周期也没有必然关系

在发布pub/订阅sub模式下,也可以完成一对多的通信,可以让一个消息有多个接受者[微信订阅号就是这样的]

3.2,特点

3.2.1,异步处理模式

消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队列)上;

消息接收者则订阅或监听该通道。一条信息可能最络转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出回应。整个过程都是异步的。

案例:

也就是说,一个系统和另一个系统这间进行通信的时候,假如系统A希望发送一个消息给系统B,让它去处理,但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活” 了,接着系统B从MQ里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事,与系统A无关。

这样的一种通信方式,就是所谓的“异步”通信方式,对于系统A来说,只要把消息发给MQ,然后系统B就会异步处去进行处理了,系统A不能“同步”的等待系统B处理完。这样的好处是什么呢?解耦

3.2.2,应用系统的解耦

发送者和接收者不必了解对方,只需要确认消息

发送者和接收者不必同时在线

3.2.3,现实中的业务

4,什么是ActiveMQ

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点:

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBC和journal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis(Apache Extensible Interaction System 即阿帕奇可扩展交互系统。Axis本质上就是一个SOAP引擎,提供创建服务器端、客户端和网关SOAP操作的基本框架)的整合

10. 可以很容易得调用内嵌JMS provider,进行测试

11.支持集群

1,下载

下载地址http://activemq.apache.org/activemq-5156-release.html

2,安装

1,配置jdk环境变量【不会的回看Linux】

2,上传mq的压缩包到Linux

3,解压到usr/local/ActiveMQ

mkdir /usr/local/ActiveMQ
tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /usr/local/ActiveMQ/

5,配置用户名和密码[默认为admin/admin]
vim conf/users.properties

admin = admin

4,启动和停止重启
./bin/activemq start

2

./bin/activemq stop

3

./bin/activemq restart

5,访问

5,端口说明

ActiveMQ是使用61616端口提供的JMS服务

使用8161提供管理控制台的服务

 

1,JMS消息发送模式

点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息.在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

2,JMS应用程序接口

1,ConnectionFactory 接口(连接工厂)

用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

2,Connection 接口(连接)连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

3,Destination 接口(目标)目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题4,MessageConsumer 接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

5,MessageProducer 接口(消息生产者)

   由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

   6,Message 接口(消息)

   

是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

7,Session 接口(会话)

表示一个单线程得上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续得。就是说消息按照发送的顺序一个一个接收的。会话得好处是它支持事务,如果用户支持了事务支持,会话得上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息产生者来发送消息,创建消息消费者来接收消息。

1,创建项目加入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.activemq</groupId>
    <artifactId>activemq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--activemq需要的jar包  不是使用最新版本的。有BUG -->
    <dependencies>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>

        <!--下面是log4等通用配置 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
        </dependency>

    </dependencies>

</project>

  

2,生产者

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created by Administrator on 2019-10-26. */public class ActiveMq {

    private static final String QUEUE = "queue";    private static final String URL = "tcp://47.110.76.75:61616";    public static void main(String[] args) throws Exception{        //第一步:创建ActiveMQConnectionFactory对象,需要指定服务端IP以及端口号.//brokerURL服务器得IP以端口号。        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);       //第二步:使用connectionFactory创建一个connection对象        Connection connection = connectionFactory.createConnection();        //第三步:开启连接,调用connection对象start得方法        connection.start();        //第四步:使用Connection对象创建一个session对象        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        //第五步:使用Session对象创建Destination对象(topic、queue),此处创建一个Queue对象。//参数:队列得名称。        Queue queue = session.createQueue(QUEUE);        //第六步:使用session对象创建一个Producer对象        MessageProducer producer = session.createProducer(queue);        //第七步:创建一个Message = new ActiveMQTextMessage();        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test");        //第八步:        producer.send(textMessage);

        //第九步        producer.close();        session.close();        connection.close();        System.out.println("生产者向MQ发送消息成功!");

    }}

生产完成之后可以查看有消息生成

3,消费者

消费者有两种消费方法:

1、同步消费。通过调用消费者receive方法从目的地中显示提取消息,receive方法可以一直阻塞到消息到达。

2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取得动作。

实现MessageListener接口,在MessageListener()方法中实现消息得处理逻辑。

4,同步消费者【一般不推荐】

第一步:创建一个连接工厂

第二步:创建一个连接

第三步:打开连接

第四步:创建会话

第五步:创建目的地

第六步:创建消费者

第七步:接收消息

第八步:关闭资源

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created by Administrator on 2019-10-26. */public class ActiveMq {

    private static final String QUEUE = "queue";    private static final String URL = "tcp://47.110.76.75:61616";    public static void main(String[] args) throws Exception{        //第一步:创建ActiveMQConnectionFactory对象,需要指定服务端IP以及端口号.//brokerURL服务器得IP以端口号。        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);       //第二步:使用connectionFactory创建一个connection对象        Connection connection = connectionFactory.createConnection();        //第三步:开启连接,调用connection对象start得方法        connection.start();        //第四步:使用Connection对象创建一个session对象        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        //第五步:使用Session对象创建Destination对象(topic、queue),此处创建一个Queue对象。//参数:队列得名称。        Queue queue = session.createQueue(QUEUE);        //第六步:使用session对象创建一个Producer对象        MessageProducer producer = session.createProducer(queue);        //第七步:创建一个Message = new ActiveMQTextMessage();        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test");        //第八步:        producer.send(textMessage);

        //第九步        producer.close();        session.close();        connection.close();        System.out.println("生产者向MQ发送消息成功!");

    }}

4.1,receive方法说明

receive() 一直阻塞

receive(1000) 10秒类没接收消息就放弃

5,异步消费者【推荐】

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源。

 

原文地址:https://www.cnblogs.com/jacksonxiao/p/11745675.html

时间: 2024-08-08 13:10:51

1,生活中的案例[生产中的问题]为什么要使用MQ的相关文章

通过一个生活中的案例场景,揭开并发包底层AQS的神秘面纱

本文导读 生活中案例场景介绍 联想到 AQS 到底是什么 AQS 的设计初衷 揭秘 AQS 底层实现 最后的总结 当你在学习某一个技能的时候,是否曾有过这样的感觉,就是同一个技能点学完了之后,过了一段时间,如果你没有任何总结,或者是不经常回顾,遗忘的速度是非常之快的. 忘记了之后,然后再重新学,因为已经间隔了一段时间,再次学习又当做了新的知识点来学.这种状态如此反复,浪费了相同的时间,但学习效果却收效甚微. 每当遇到这种情况,我们可以停下来,思考一下.对于某一个技术知识点理解起来不是那么好懂的时

流量开关仪表设备在实际应用中的案例

易卖工控网12月2日讯,GB50974-2014标准11.0.4规定了直接启动消防泵的信号是流量开关和压力开关,其原因是可靠性高,主要表现为灵敏度高.耐久性强等优点. 实际使用中的流量仪表多应用于过程控制,流量仪表在过程控制中的作用是对密封管道中的流体流量进行检测,必要时还将流量测量仪表与调节仪表.执行器等组成调节系统,将流量稳定在合适的范围,从而实现过程的稳定性. 这里先解释一下比较常用的水流量开关的工作原理,帮助大家理解后面提到的内容. 当水流开关内有水流动,水流量≥1.0L/min时,水流

从现实中的案例关注RFID消费终端安全风险

之前在发布关于<从乌云的错误漏洞分析看Mifare Classic安全>一文当中介绍了关于Mifare Classic安全的问题,然而RFID安全问题并非仅仅基于Mifare Classic而已.在过往的RFID安全资料当中,我们通常都是关注Tag的安全,例如:Mifare Classic的算法破解以及低频卡类的安全问题.却不会从整体的角度分析其衍生出来的其他安全问题. 从2008年漏洞公开至今,国内已经开始逐渐分批地更新Mifare Classic为Mifare PLUS/Mifare DE

理解性能的奥秘——应用程序中慢,SSMS中快(5)——案例:如何应对参数嗅探

本文属于<理解性能的奥秘--应用程序中慢,SSMS中快>系列 接上文:理解性能的奥秘--应用程序中慢,SSMS中快(4)--收集解决参数嗅探问题的信息 首先我们需要明白,参数嗅探本身不是问题,而是一个特性,避免SQL Server做出盲目的假设,从而产生次优查询计划.但是有些情况下,参数嗅探却会带来负面影响.通常有下面三种典型的情况: 查询使用的参数嗅探完全不合适.也就是说,查询计划对于这次执行是合适的,但是对于下一次执行就可能不合适. 应用程序中存在特定的调用模式,而且与其他大部分调用模式差

《了不起的 nodejs》中 TwitterWeb 案例 bug 解决

了不起的nodejs算是一本不错的入门书,不过书中个别案例存在bug,按照书中源码无法做出和书中相同效果,原本兴奋的心情掺杂着些许失落. 现在我们看一下第七章HTTP,一个Twitter Web客户端的例子. 先贴上书中源码 1.创建server.js 1 var qs = require('querystring'); 2 require('http').createServer(function(req,res){ 3 var body =""; 4 req.on('data',f

git零基础快速入门实战,重点讲解,在实际生产中整合idea对版本、分支的管理等

1.git简单描述 git是重要的版本管理工具,几乎每个码农都有自己的git账号管理自己的代码,同时很多公司也是用git管理公司的代码, 因此掌握git在实际生产中的常规使用非常重要. 2.git学习目录 2.1.git认识.注册.git客户端.tortoiseGit客户端等基本使用:https://www.cnblogs.com/newAndHui/p/10747646.html 2.2.idea git 整合使用 :https://www.cnblogs.com/newAndHui/p/10

理解性能的奥秘——应用程序中慢,SSMS中快(6)——SQL Server如何编译动态SQL

本文属于<理解性能的奥秘--应用程序中慢,SSMS中快>系列 接上文:理解性能的奥秘--应用程序中慢,SSMS中快(5)--案例:如何应对参数嗅探 我们抛开参数嗅探的话题,回到了本系列的最初关注点中:为什么语句在应用程序中慢,但是在SSMS中快?到目前为止,都是在说存储过程的情况.而存储过程的问题通常是因为SET ARITHABORT的不同设置项的原因.如果你的应用不使用存储过程,而是通过中间层提交客户端的查询,那么也有几个原因可能让你的查询因为不同的缓存条目从而使得在SSMS和应用程序中的运

Expo大作战(十)--expo中的App Icon,expo中的Assets,expo中的ErrorHandling错误处理

简要:本系列文章讲会对expo进行全面的介绍,本人从2017年6月份接触expo以来,对expo的研究断断续续,一路走来将近10个月,废话不多说,接下来你看到内容,讲全部来与官网 我猜去全部机翻+个人修改补充+demo测试的形式,对expo进行一次大补血!欢迎加入expo兴趣学习交流群:597732981 [之前我写过一些列关于expo和rn入门配置的东i西,大家可以点击这里查看:从零学习rn开发] 相关文章: Expo大作战(一)--什么是expo,如何安装expo clinet和xde,xd

19SpringMvc_在业务控制方法中收集List集合中包含JavaBean参数

本文要实现的功能是给一张表单: 可以看到这样表格一共有四行,每一行代表一个员工(User),每一个员工有username和salary.我们要做的是把这四个员工信息装进一个List集合中. 那么怎么做呢? List不就是一个数组吗? 我们这么考虑: 案例结构如下: 第一步编写首页面emp.jsp,代码如下: <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%