RabbitMQ 消息中间件

RabbitMQ 是使用 Erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(Advanced Message Queuing Protocol, AMQP)。

与 Kafka 等消息队列相比,RabbitMQ 最大的优势在于其较高的可靠性:

  • 提供确认(ACK)和重传机制保证消息完成消费, 消费者异常不会导致消息丢失
  • 提供消息持久化机制, broker 崩溃不会导致消息丢失
  • 集群模式下工作, 保证高可用

因为具有较高可靠性和一致性, RabbitMQ 可以胜任订单处理、秒杀等一致性要求较高的业务场景。

RabbitMQ 概念与机制

RabbitMQ 中的概念模型:

  • Broker: 消息中间件实例, 可能是单个节点也可能是运行在多节点集群上的逻辑实体
  • 消息(Message): 消息由消息头和消息体两部分组成。消息头中包括routing-key、priority等标准消息头以及其它自定义消息头,用于定义RabbitMQ对消息行为。消息体是字节流,包含消息内容。
  • 连接(Connection): 客户端与 Broker 之间的 TCP连接
  • 信道(Channel): Channel 是建立在 TCP 连接上的逻辑(虚拟)连接。多个 Channel 复用同一个 TCP 连接, 以避免建立 TCP 连接的巨大开销。 RabbitMQ 官方要求每个线程使用独立的 Channel, 禁止多个线程共用 Channel。
  • 生产者(Publisher): 发送消息的客户端线程
  • 消费者(Consumer): 处理消息的客户端线程
  • 交换机(Exchange): 交换机负责将消息投递到相应的队列
  • 队列(Queue): 接收并保存交换机投递的消息,直至被消费者成功消费。逻辑结构遵循先进先出FIFO。
  • 绑定(Binding): 将队列(Queue)注册到交换机(Exchange)的路由表
  • 虚拟主机(Vhost): 每个Broker下可建立多个vhost, 每个 vhost 可建立独立的 Exchange、Queue、绑定及权限系统。同一个 Broker 下的 vhost 共享 Connection、Channel 和 用户系统,就是说可以使用同一个用户身份使用同一个 Channel 访问不同 vhost。

交换机(Exchange)

生产者发送的消息会首先送到交换机(Exchange), 交换机根据自身类型和消息的 routing-key 等信息将消息投递到绑定的消息队列中。

RabbitMQ中的四种标准交换机:

  • direct: 如果消息的 routing-key 与队列的 binding-key 完全相同,direct类型的交换机则会将消息投递到该队列中。

    • 多个队列可以使用相同的 binding-key 绑定到同一个 direct 交换机,direct 交换机会把消息投递到所有 binding-key 与消息 routing-key 相同的队列
  • topic: 允许队列的 binding-key 中包含通配符*#, topic 交换机会将消息投递到 binding-key 与 routing-key 匹配的队列中。
    • 通配符按照关键字进行匹配,如news.cn.a中的关键字是newscna,即关键字按照.分割
    • #通配符匹配0个或多个关键字, news.#.a可以匹配news.a, news.cn.anews.asia.cn.a
    • *通配符匹配一个关键字, news.*.a匹配news.cn.a不匹配news.anews.asia.cn.a
  • fanout: fanout 交换机不进行任何匹配, 将消息投递到所有绑定的队列
  • header: header 交换机根据消息头进行投递,现在已较少使用

我们可以使用 RabbitMQ 的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的delayed-message-exchange

消息头中的delivery-mode可以设置为 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理, 即使 RabbitMQ 崩溃也不会丢失。

消费者客户端通常使用的channel.basicConsume使用推(push)模式投递消息, 即当有新消息时 Broker 通过 channel 主动向客户端发送消息。客户端也可以使用channel.basicGet从 Broker 拉取消息。

ACK机制

RabbitMQ 提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失。

确认送达的回执有三种:

  • ACK: 消息已被成功处理
  • NACK: 消息处理异常, 需要重新投递
  • REJECT: 消息非法, 丢弃消息

RabbitMQ 的 Queue 可以设置 no_ack=true, 则消息被投递后即删除不等待回执。

channel.basicConsume 可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。

Java 代码示例

首先安装并启动RabbitMQ实例, Mac用户可以使用 Homebrew 进行安装:

brew install rabbitmq

启动服务:

brew services start rabbitmq

或者使用官方docker镜像:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

RabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。

RabbitMQ默认TCP端口为5672, Web控制台默认端口15672。

在Maven中添加依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.1</version>
</dependency>

编写生产者:

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author finley
 */
public class RabbitProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        try (Connection conn = factory.newConnection();
             Channel channel = conn.createChannel()) {
            String exchangeName = "test-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);

            String routingKey = "hello";

            byte[] msg = "hello world".getBytes();
            AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
            propsBuilder.deliveryMode(2); // persistent
            propsBuilder.priority(0); // normal
            propsBuilder.contentType("text/plain");
            channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
        }
    }
}

编写消费者:

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * @author finley
 */
public class RabbitConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        try (Connection conn = factory.newConnection();
             Channel channel = conn.createChannel()) {
            String exchangeName = "test-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);

            String queueName = channel.queueDeclare().getQueue();
            String bindingKey = "hello";
            channel.queueBind(queueName, exchangeName, bindingKey);

            while(true) {
                channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        String bodyStr = new String(body, "UTF-8");
                        System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
                        long deliveryTag = envelope.getDeliveryTag();
                        channel.basicAck(deliveryTag, false);
                    }
                });
            }
        }
    }

}

RabbitMQ 的消息为字节, 可以将 Java 对象序列化后作为消息体发送。

原文地址:https://www.cnblogs.com/Finley/p/10126315.html

时间: 2024-11-01 23:14:02

RabbitMQ 消息中间件的相关文章

CentOS6.8搭建rabbitmq消息中间件

参考资料:http://blog.csdn.net/yunfeng482/article/details/72853983 一.rabbitmq简介 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除

python中使用rabbitmq消息中间件

上周一直在研究zeromq,并且也实现了了zeromq在python和ruby之间的通信,但是如果是一个大型的企业级应用,对消息中间件的要求比较高,比如消息的持久化机制以及系统崩溃恢复等等需求,这个时候zeromq就显得有点鸡肋了,特别是消息持久化是他的的硬伤,那么怎么找一个比较适合的中间件呢? 目前市场上主流的中间件除了zeromq,还有rabbitmq,activemq等等,这两周都比较有名,一个是基于erlang,一个是基于jms,rabbitmq是AMQP(高级消息队列协议)的标准实现,

RabbitMQ消息中间件介绍

一.基础介绍 随着分布式应用的发展消息队列中间件成为C/S架构中解耦的一个重要环节,传统的消息传输模型中,C端发出消息,S端必须在线,否则将无法继续进行,而在拥有消息中间件的模型下消息产生者(C端)发出的消息由中间件来接受,即使此时消息消费者(S端)即便不在线也有可能不产生中断.RabbitMQ作为消息中间件的一种其组成部分如下图所示: 他的核心组成部分为: 交换器(Exchange):起作用主要是将收到的消息交换至对应的队列 队列(Message):用于存放供订阅者(Consumer)读取消息

RabbitMQ消息中间件技术精讲

RabbitMQ核心API+高级特性+Spring家族整合+高可靠集群+SET化架构设计+组件设计思路 神秘数字-->求求 号:->:2304636824 第1章 课程介绍 本章首先让大家彻底明白为什么学习RabbitMQ,通过本课程的学习具体收获有哪些?课程内容具体安排与学习建议,然后为大家简单介绍下业界主流消息中间件有哪些,各自适用场景等. 1-1 课程导学 1-2 业界主流消息中间件介绍 第2章 低门槛,入门RabbitMQ核心概念 本章首先为大家讲解互联网大厂为什么选择RabbitMQ

慕课网RabbitMQ消息中间件技术精讲

第1章 课程介绍本章首先让大家彻底明白为什么学习RabbitMQ,通过本课程的学习具体收获有哪些?课程内容具体安排与学习建议,然后为大家简单介绍下业界主流消息中间件有哪些,各自适用场景等. 1-1 课程导学1-2 业界主流消息中间件介绍第2章 低门槛,入门RabbitMQ核心概念本章首先为大家讲解互联网大厂为什么选择RabbitMQ? RabbitMQ的高性能之道是如何做到的?什么是AMPQ高级协议?AMPQ核心概念是什么?RabbitMQ整体架构模型是什么样子的?RabbitMQ消息是如何流转

RabbitMQ 消息中间件的部署

一.RabbitMQ安装 安装步骤: 1.  安装Erlang 2.  安装RabbitMQ server 3.  配置环境 4.  开启服务 安装Erlang: 安装最新版本Erlang 配置环境变量:ERLANG_HOME=D:\RabbitMQ\erl5.9.2,Erlang的安装目录. 安装RabbitMQ服务器: 下载rabbitmq-server-windows-2.8.7.zip,解压到D:\RabbitMQ\rabbitmq_server-2.8.7. sbin目录下存放Rabb

消息中间件面试题31道RabbitMQ+ActiveMQ+Kafka

前言 文章开始前,我们先了解一下什么是消息中间件? 什么是中间件? 非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件. 什么是消息中间件? 是关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统 图示: 消息中间件RabbitMQ+ActiveMQ+Kafka的对比 接下来就是消息中间件面试题RabbitMQ+ActiveMQ+Kafka RabbitMQ消息中间件系列 1:RabbitMQ 中的 broker 是指什么?cl

消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!

前言 来了解RabbitMQ一个重要的概念:Exchange交换机 1. Exchange概念 Exchange:接收消息,并根据路由键转发消息所绑定的队列. 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列. 黄色框:交换机和队列通过路由键有一个绑定的关系. 绿色框:消费端通过监听队列来接收消息. 2. 交换机属性 Name:交换机名称 Type:交换机类型--direct.topic.fanout.headers.sharding(此篇不讲) Durability:是否需要持久化,

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr