为什么要使用MQ
微服务架构后,链式调用是我们在写程序时候的一般流程,为了这完成一个整体功能会把它拆分成多个函数(或子模块)比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但是大型分布式应用中,系统间的RPC交互复杂,一个功能后面要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构,这样的架构有没有问题呢?有
根据上面的风个问题,在设置系统时可以明确要克到的目标
1,要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦
2,设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰
3,强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
3,什么是MQ
3.1,定义
面向消息的中间件(message-oriented middleware0) MOM能够很好的解决以上的问题。
是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等
大致流程
发送者把消息发给消息服务器,消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服务器会把消息转发给接受者。
在这个过程中,发送和接受是异步的,也就是发送无需等待,发送者和接受者的生命周期也没有必然关系
在发布pub/订阅sub模式下,也可以完成一对多的通信,可以让一个消息有多个接受者[微信订阅号就是这样的]
3.2,特点
3.2.1,异步处理模式
消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队列)上;
消息接收者则订阅或监听该通道。一条信息可能最络转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出回应。整个过程都是异步的。
案例:
也就是说,一个系统和另一个系统这间进行通信的时候,假如系统A希望发送一个消息给系统B,让它去处理,但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活” 了,接着系统B从MQ里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事,与系统A无关。
这样的一种通信方式,就是所谓的“异步”通信方式,对于系统A来说,只要把消息发给MQ,然后系统B就会异步处去进行处理了,系统A不能“同步”的等待系统B处理完。这样的好处是什么呢?解耦
3.2.2,应用系统的解耦
发送者和接收者不必了解对方,只需要确认消息
发送者和接收者不必同时在线
3.2.3,现实中的业务
4,什么是ActiveMQ
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis(Apache Extensible Interaction System 即阿帕奇可扩展交互系统。Axis本质上就是一个SOAP引擎,提供创建服务器端、客户端和网关SOAP操作的基本框架)的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
11.支持集群
1,下载
下载地址http://activemq.apache.org/activemq-5156-release.html
2,安装
1,配置jdk环境变量【不会的回看Linux】
2,上传mq的压缩包到Linux
3,解压到usr/local/ActiveMQ
mkdir /usr/local/ActiveMQtar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /usr/local/ActiveMQ/ 5,配置用户名和密码[默认为admin/admin]vim conf/users.properties admin = admin 4,启动和停止重启./bin/activemq start2
./bin/activemq stop3
./bin/activemq restart 5,访问5,端口说明
ActiveMQ是使用61616端口提供的JMS服务
使用8161提供管理控制台的服务
1,JMS消息发送模式
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息.在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。2,JMS应用程序接口
1,ConnectionFactory 接口(连接工厂) 用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。 2,Connection 接口(连接)连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。 3,Destination 接口(目标)目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题4,MessageConsumer 接口(消息消费者)由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
5,MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
6,Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
消息接口非常灵活,并提供了许多方式来定制消息的内容。
7,Session 接口(会话)
表示一个单线程得上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续得。就是说消息按照发送的顺序一个一个接收的。会话得好处是它支持事务,如果用户支持了事务支持,会话得上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息产生者来发送消息,创建消息消费者来接收消息。
1,创建项目加入maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.activemq</groupId> <artifactId>activemq</artifactId> <version>1.0-SNAPSHOT</version> <!--activemq需要的jar包 不是使用最新版本的。有BUG --> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.5</version> </dependency> <!--下面是log4等通用配置 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> </dependency> </dependencies> </project>
2,生产者
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
package com.activemq.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by Administrator on 2019-10-26. */public class ActiveMq { private static final String QUEUE = "queue"; private static final String URL = "tcp://47.110.76.75:61616"; public static void main(String[] args) throws Exception{ //第一步:创建ActiveMQConnectionFactory对象,需要指定服务端IP以及端口号.//brokerURL服务器得IP以端口号。 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //第二步:使用connectionFactory创建一个connection对象 Connection connection = connectionFactory.createConnection(); //第三步:开启连接,调用connection对象start得方法 connection.start(); //第四步:使用Connection对象创建一个session对象 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //第五步:使用Session对象创建Destination对象(topic、queue),此处创建一个Queue对象。//参数:队列得名称。 Queue queue = session.createQueue(QUEUE); //第六步:使用session对象创建一个Producer对象 MessageProducer producer = session.createProducer(queue); //第七步:创建一个Message = new ActiveMQTextMessage(); TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test"); //第八步: producer.send(textMessage); //第九步 producer.close(); session.close(); connection.close(); System.out.println("生产者向MQ发送消息成功!"); }}生产完成之后可以查看有消息生成
3,消费者
消费者有两种消费方法:
1、同步消费。通过调用消费者receive方法从目的地中显示提取消息,receive方法可以一直阻塞到消息到达。
2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取得动作。
实现MessageListener接口,在MessageListener()方法中实现消息得处理逻辑。
4,同步消费者【一般不推荐】
第一步:创建一个连接工厂
第二步:创建一个连接
第三步:打开连接
第四步:创建会话
第五步:创建目的地
第六步:创建消费者
第七步:接收消息
第八步:关闭资源
package com.activemq.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by Administrator on 2019-10-26. */public class ActiveMq { private static final String QUEUE = "queue"; private static final String URL = "tcp://47.110.76.75:61616"; public static void main(String[] args) throws Exception{ //第一步:创建ActiveMQConnectionFactory对象,需要指定服务端IP以及端口号.//brokerURL服务器得IP以端口号。 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //第二步:使用connectionFactory创建一个connection对象 Connection connection = connectionFactory.createConnection(); //第三步:开启连接,调用connection对象start得方法 connection.start(); //第四步:使用Connection对象创建一个session对象 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //第五步:使用Session对象创建Destination对象(topic、queue),此处创建一个Queue对象。//参数:队列得名称。 Queue queue = session.createQueue(QUEUE); //第六步:使用session对象创建一个Producer对象 MessageProducer producer = session.createProducer(queue); //第七步:创建一个Message = new ActiveMQTextMessage(); TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test"); //第八步: producer.send(textMessage); //第九步 producer.close(); session.close(); connection.close(); System.out.println("生产者向MQ发送消息成功!"); }}4.1,receive方法说明
receive() 一直阻塞
receive(1000) 10秒类没接收消息就放弃
5,异步消费者【推荐】
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源。
原文地址:https://www.cnblogs.com/jacksonxiao/p/11745675.html