消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?

应用场景:

1. 通知,针对发送事件的描述,内容可以是消息的日志,也可以是真实的报告通知给另一个程序或者管理员.

说明: 首先选择交换机,如果选择fanout交换机,则需要为每种告警传输类型(邮件/微信/手机/短信)创建队列,但同时也带来坏处就是每个消息都会发送到所有队列,导致告警消息发生时,被报警消息淹没,如果选择topic交换机,则可为其创建四种严重级别告警info/warning/problem/citical,但如果使用fanout类型交换机消息会发送到所有这四个级别队列,如果使用direct交换机,则四个严重等级会被定死,无法扩展,而topic交换机则允许我们在如上四个严重等级上加上类型,如当我们触发报警API时候路由键设置为critical.rate_limit,则消息不经会发送到cirtica.*l队列而且同时会被发送到*.rate_limite,至于针对每种类型怎么处理这就是消费者该干的事情了~



> 消费者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
import json
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建日志交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 交换机类型
        exchange_type="topic",
        # 如果同名交换机已存在依然返回成功,否则创建
        passive=False,
        # 声明为持久化交换机
        durable=True,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 创建info日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="info",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建warning日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="warning",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建problem日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="problem",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建cirtical日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="cirtical",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建rate_limit日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="cirtical_ratelimit",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="info",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="info.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="warning",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="warning.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="problem",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="problem.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="cirtical",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="cirtical.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="cirtical_ratelimit",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="*.ratelimit"
    )
    def callback_wrapper(callback):
        def wrapper(channel, method, header, body):
            print ‘#{0}[{1}]>: {2}‘.format(method.consumer_tag, method.delivery_tag, body),
            if header.content_type != ‘application/json‘:
                print ‘with wrong content_type(application/json)‘
                channel.basic_ack(delivery_tag=method.delivery_tag)
                return
            print ‘with correct content_type(application/json)‘
            callback(channel, method, header, body)
            # 发送消息确认
            channel.basic_ack(delivery_tag=method.delivery_tag)
        return wrapper
    @callback_wrapper
    def info_callback_handler(channel, method, header, body):
        """
        channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
        method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
        header : AMQP消息头信息,携带可选的消息元数据,如数据类型
        body   : 实际消息内容
        """
        pass
        return
    @callback_wrapper
    def warning_callback_handler(channel, method, header, body):
        pass
        return
    @callback_wrapper
    def problem_callback_handler(channel, method, header, body):
        pass
        return
    @callback_wrapper
    def cirtical_callback_handler(channel, method, header, body):
        pass
        return
    @callback_wrapper
    def cirtical_ratelimit_callback_handler(channel, method, header, body):
        pass
        return
    # 作为指定队列消费者
    channel.basic_consume(
            info_callback_handler,
            queue="info",
            # 必须确认后再接收后续消息
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_info"
    )
    channel.basic_consume(
            warning_callback_handler,
            queue="warning",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_warning"
    )
    channel.basic_consume(
            problem_callback_handler,
            queue="problem",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_problem"
    channel.basic_consume(
            cirtical_callback_handler,
            queue="cirtical",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_cirtical"
    )
    channel.basic_consume(
            cirtical_ratelimit_callback_handler,
            queue="cirtical_ratelimit",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_cirtical_ratelimit"
    )
    # 循环调用回调函数接收处理消息
    channel.start_consuming()
    channel.close()


> 生产者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    exchange = sys.argv[1]
    routekey = sys.argv[2]
    messages = sys.argv[3]
    # 创建配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = ‘application/json‘
    # 尝试发布消息
    channel.basic_publish(
        # 发布消息内容
        body=messages,
        # 发布到交换机
        exchange=exchange,
        # 发布信息属性
        properties=msg_props,
        # 发布信息时携带的路由键
        routing_key=routekey
    )


说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py xmzoomeye_alerts cirtical.ratelimit ‘{"error": "cirtical rate limit"}‘,查看消费者端输出

扩展: 对于发后即忘的消息通信模式可轻而易举的扩展,如添加一个额外的队列绑定路由键*.*,将所有级别的告警记录都记录到数据库中,以便后期分析/汇总/压缩/分类/查询等操作~

2. 批量,针对大型数据集合的工作或者转换,这种类型的任务可以构建为单一的任务请求,或者多个任务对数据集合的独立部分进行操作.





> 消费者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘guest‘, ‘guest‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ服务凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="salt-exchange",
        # 交换机类型
        type="direct",
        # 如果同名交换机已存在依然返回成功
        passive=False,
        # 声明为持久化交换机
        durable=False,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 创建队列
    channel.queue_declare(queue="salt")
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="salt",
        # 交换机名称
        exchange="salt-exchange",
        # 路由键名称
        routing_key="salt"
    )
    # 消息回调处理函数
    def msg_consumer(channel, method, header, body):
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 退出监听循环
        if body == ‘exit‘:
            channel.basic_cancel(consumer_tag="salt-consumer")
            channel.stop_consuming()
        else:
            print ‘found notice: recive queue message {0}‘.format(body)
        return
    # 作为指定队列消费者
    channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer")
    # 循环调用回调函数接收处理消息
    channel.start_consuming()


> 生产者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    exchange = sys.argv[1]
    messages = sys.argv[2]
    # 创建配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = ‘application/json‘
    # 消息持久化
    msg_props.delivery_mode = 2
    # 尝试发布消息
    channel.basic_publish(
        # 发布消息内容
        body=messages,
        # 发布到交换机
        exchange=exchange,
        # 发布信息属性
        properties=msg_props,
        # 扇形交换机本身不需要路由键,但参数个数限制,随意推荐大家直接写#匹配所有
        routing_key="#"
    )


说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py upload_pictures ‘{"image_id": 1, "user_id": 1, "image_path": "/xm-workspace/xm-webs/www.pic.com/data/images/73197d57-46a9-4d19-a48f-a44e0ad5e493.jpg"}‘,查看消费者端输出

扩展: 如上在WEB页面上上传完图片后,希望对图片进行生成缩略图/奖励上传用户积分/分享通知朋友圈等等,这几个任务之间是没有相互依赖关系的,不需要等待对方的结果才能继续执行,所以可以并行执行,扩展起来也非常容易,直接添加一个对应的队列和消费者即可,如要记录上传图片日志记录需求~,当发现一个创建缩略图的消费者跟不上节奏,直接在同台或异台服务器上再跑一个创建缩略图的消费者即可,任务会自动轮询分配,这一切对于用户是无感知的~

3. RPC,针对大量RPC请求使用消息来发回应答,AMQP消息头里有一个reply_to字段,生产者JSON RPC-API生成随机零时队列名存储到预发布RPC调用消息的头部reply_to字段到指定队列,然后在随机队列上监听响应数据,消费者JSON RPC-SRV接收到消息处理完毕后读取回调中header的reply_to字段,然后将响应发回零时队列,由于所有没有绑定交换机的队列都会自动绑定到匿名交换机,所以必用申请额外的交换机直接使用匿名交换机,消息一旦被接收,零时队列会自动被删除.至此完成一次RPC调用



> 消费者

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
import json
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建RPC交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="rpc",
        # 交换机类型
        exchange_type="direct",
        # 如果同名交换机已存在依然返回成功,否则创建
        passive=False,
        # 声明为持久化交换机
        durable=True,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 创建ping任务队列
    channel.queue_declare(
            # 队列名称
            queue="ping",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为非持久化队列
            durable=False,
            # 声明为私有队列
            exclusive=True,
            # 队列闲置会自动删除
            auto_delete=True
    )
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="ping",
        # 交换机名称
        exchange="rpc",
    )
    # 请求回调
    def api_request_ping(channel, method, header, body):
        """
        channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
        method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
        header : AMQP消息头信息,携带可选的消息元数据,如数据类型
        body   : 实际消息内容
        """
        print ‘#{0}[{1}]>: {2}‘.format(
                # 由于此处的header是生产者的,所以可通过header.reply_to获取随机队列名
                header.reply_to,
                method.delivery_tag,
                body,
        )
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 发送响应对象
        channel.basic_publish(
                body="pong",
                # 由于所有队列默认都会绑定到匿名交换机,非常方便直接发给它,它会根据传递过来的路由键原路返回
                exchange="",
                routing_key=header.reply_to
        )
        return
    # 作为指定队列消费者
    channel.basic_consume(
            api_request_ping,
            queue="ping",
            # 必须确认后再接收后续消息
            no_ack=False,
            consumer_tag="ping_request"
    )
    # 循环调用回调函数接收处理消息
    channel.start_consuming()
    channel.close()


> 生产者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建匿名零时响应队列,返回唯一队列名称通过header的reply_to给消费者
    queue = channel.queue_declare(
        # 如果同名队列已存在依然返回成功,否则创建
        passive=False,
        # 声明为非持久化队列
        durable=False,
        # 声明为私有队列
        exclusive=True,
        # 队列闲置会自动删除
        auto_delete=True
    )
    exchange = sys.argv[1]
    messages = sys.argv[2]
    # 创建配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = ‘application/json‘
    # 将唯一响应队列名传递给消费者
    msg_props.reply_to = queue.method.queue
    # 发送消息
    channel.basic_publish(
        # 发布消息内容
        body=messages,
        # 发布到交换机
        exchange=exchange,
        # 发布信息属性
        properties=msg_props,
        # 消息路由键
        routing_key="ping"
    )
    # 响应回调
    def api_response_ping(channel, method, header, body):
        """
        channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
        method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
        header : AMQP消息头信息,携带可选的消息元数据,如数据类型
        body   : 实际消息内容
        """
        print ‘#{0}[{1}]>: {2}‘.format(
                # 由于此处返回的是消费者的header,所以不能使用header.reply_to而应该使用生成的随机唯一队列名
                queue.method.queue,
                method.delivery_tag,
                body,
        )
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        return
    # 作为指定队列消费者
    channel.basic_consume(
            api_response_ping,
            # 从匿名零时队列中收取消息
            queue=queue.method.queue,
            # 必须确认后再接收后续消息
            no_ack=False,
            consumer_tag="ping_response"
    )
    # 循环调用回调函数接收处理消息
    channel.start_consuming()
    channel.close()


说明: 测试非常简单,首先启动消费者python api_server.py,然后尝试执行生产者python rpc_client.py rpc ‘{"exec": "ping"}‘,查看消费者端输出

扩展: 可以轻易的通过创建队列和绑定的方式来扩展API以支持新的API方法,这样做的最大好处是任何一台服务器都无需对所有的API调用做应答,其它RPC服务器部署在同台或异台物理机器,而这一切对于用户是无感知的~

时间: 2024-10-17 16:31:51

消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?的相关文章

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持 一丶前言 在生产中,存在一些场景,需要对数据进行批量操作.如,可以先将数据存放到redis,然后将数据进行批量写进数据库.但是使用redis,不得不面对一个数据容易丢失的问题.也可以考虑使用消息队列进行替换,在数据持久化,数据不丢失方面,消息队列确实比redis好一点,毕竟设计不一样.是不是使用消息队列,就一定好呢?不是的,首先使用消息队列,不能确保数据百分百不丢失,(如果要做到百分百不丢失,设计上就会比较复杂),除此之

消息队列 - mac上安装RabbitMq (转)

什么是RabbitMQ? RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为面向消息的中间件).支持WIndows.Linux.MAC OS 操作系统和包括java..net在内的多种编程语言. AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,分面向消息的中间件设计.基于此协议的客户端与消息中间件可传递消息,并不受 客户端/中间件 不同

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基

RabbitMQ消息队列在PHP下的应用

消息队列的实现中,RabbitMQ以其健壮和可靠见长.公司的项目中选择了它作为消息队列的实现.关于MQ的机制和原理网上有很多文章可以看,这里就不再赘述,只讲几个比较容易混淆的问题 1,binding key和routing key binding key和routing key是都不过是自己设置的一组字符,只是用的地方不同,binding key是在绑定交换机和队列时候通过方法传递的字符串,routing key是在发布消息时候,顺便带上的字符串,有些人说这两个其实是一个东西,也对也不对,说对,

Net分布式系统之四:RabbitMQ消息队列应用

消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基于AMQP(消息队列协议),由

RabbitMQ (消息队列)专题学习05 routing(路由)

(使用Java客户端) 一.概述 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 二.路由功能实现 2.1.绑定(bindings) 在前面的学习中已经创建了绑定(bindings),代码如下: channel.queueBind

第二百九十一节,RabbitMQ多设备消息队列

RabbitMQ多设备消息队列-安装与简介 RabbitMQ简介 解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议. AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输.RabbitMQ是该AMQP协议的一种实现,利用它,可以将消息安全可靠的从发 送方传输到接收方.简单的说,就是消息发送方利用RabbitMQ将信息安全的传递给接收方. RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息

架构设计之NodeJS操作消息队列RabbitMQ

一. 什么是消息队列? 消息(Message)是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象. 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递.消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的.这样发布者和使用者都不用知道对方的存在. 二. 常用的消息队列有哪些? RabbitMQ.RocketMQ.ActiveMQ.Kafka

Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka

一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式:2.并行方式 a.串行