RabbitMQ(二):理解消息通信RabbitMQ

原文:RabbitMQ(二):理解消息通信RabbitMQ

一、消费者、生产者和信道

  生产者(producer):生产者创建消息,然后发布(发送)到代理服务器(RabbitMQ),可以说发送消息的程序就是生产者。什么是消息?消息包含两部分:有效载荷和标签。有效载荷就是传输的数据,可以是任何内容,包括json数据和图片等等。而标签(一个叫交换器名称和可选的主题标记)描述了有效载荷,RabbitMQ用它来决定谁将获得这个消息。

  消费者(consumer):消费者就是接收消息并处理消息的程序,他们连接到代理服务器上,并订阅到队列上。当消费者接收消息时,它只是得到消息的有效载荷。整个过程很简单:生产者创建消息,消费者接收消息。你的应用程序可以作为生产者也可以作为消费者,在两者之间切换。但是消息的传输必定会通过某一介质传递,此处的消息就通过信道传递。

  信道(channel):不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。应用程序和Rabbit代理服务器之间会创建一条TCP连接,TCP连接就像电缆,信道相当于电缆中的光束。在一条TCP连接上创建多少条信道是没有限制的,所以不会对操作系统的TCP栈造成额外的负担。

二、队列、交换器和绑定

  Rabbit的消息路由分为三部分:交换器、队列和绑定。生产者把消息发布到交换器上;消息最终到达队列,并被消费者接收;绑定决定了消息如何从路由器路由到特定的队列。

  队列(queue):队列是一个栈先进先出,为生产者发布的消息提供了保存的处所,消息在此等待消费。本质上队列可以存储无限的消息,但是需要视系统内存而定。

  绑定(binding):队列通过路由键绑定到交换器,路由键就是消息通过交换器投递到那个队列的规则。

  交换器(exchange):交换器有四种类型:direct、fanout、topic和headers。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个路由键,Exchange会根据这个路由键按照特定的路由算法,将消息路由给指定的queue。

  (1)direct:如果路由键匹配的话,消息就被投递到对应的队列。类似于单播。

  (2)fanout:将消息广播到绑定的队列上,不管路由键是什么,绑定的队列都会收到消息。

  (3)topic:类似与组播,和正则表达式类似,发送给路由键符合一定规则的队列。如:路由键为user.stock的消息会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列(* 表是匹配一个任意词组,#表示匹配0个或多个词组)。

  (4)headers:不通过路由键匹配而是通过消息的header匹配,其他与direct交换器一致,但是性能上会差很多。

三、vhost

  每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost)。每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器和绑定,最重要的是拥有自己的权限机制。逻辑上vhost之间是相互独立的分离的,保证了安全性和可移植性。RabbitMQ包含了开箱即用的默认的vhost:"/",因此使用起来非常简单。当在RabbitMQ集群中创建vhost时,整个集群上都会创建该vhost,避免了vhost的重复创建。

四、第一次尝试消息通信

  交换器或队列的时候,如果交换器或队列已经存在,则直接返回结束,不会重复创建。无法承担消息丢失,则生产者和消费者中都需要尝试去创建交换器和队列,如果可以承担则可以由消费者来声明队列。我们创建生产者和消费者两个控制台程序分别运行以下代码。

  生产者:

        static void Main(string[] args)
        {
            FirstProducer();
        }

        /// <summary>
        /// 第一个生产者
        /// </summary>
        private static void FirstProducer()
        {
            //1.连接到服务器
            var conn_factory = new ConnectionFactory() {
                HostName = "localhost",UserName="guest",Password="guest",Port=5672//默认端口5672
            };
            using (IConnection conn = conn_factory.CreateConnection())
            {
                //2.创建信道
                using (IModel channel = conn.CreateModel())
                {
                    //3.声明交换器
                    channel.ExchangeDeclare(
                        "HelloExchange",    //交换器名称
                        ExchangeType.Direct,//交换器类型
                        true,              //是否持久话
                        false,              //是否自动删除
                        null                //关于交换器的详细设置,键值对形式
                        );
                    //4.声明队列
                    channel.QueueDeclare(
                        "HelloQueue",//队列名称
                        false,       //是否持久化
                        false,       //是否只对首次声明的队列可见
                        false,       //是否自动删除
                        null         ////关于队列和队列内消息的详细设置,键值对形式
                        );
                    //5.绑定交换器和队列
                    channel.QueueBind(
                        "HelloQueue",    //队列名
                        "HelloExchange", //交换器名
                        "hola"           //路由键
                        );
                    //6.发布消息
                    string msg_str = "这是生产者第一次发布的消息";
                    IBasicProperties msg_pro = channel.CreateBasicProperties();
                    msg_pro.ContentType = "text/plain";//发布的数据类型
                    for(int i = 0; i < 5; i++)
                    {
                        channel.BasicPublish(
                            "HelloExchange",                    //消息发送目标交换器名称
                            "hola",                             //路由键
                            msg_pro,                            //消息的发布属性
                            Encoding.UTF8.GetBytes(msg_str)    //消息
                            );
                    }
                }
            }
        }

  消费者:

        static void Main(string[] args)
        {
            FirstCousmer();
        }
        /// <summary>
        /// 第一个消费者
        /// </summary>
        private static void FirstCousmer()
        {
            //1.链接到服务器
            var conn_factory = new ConnectionFactory()
            {
                HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672
            };
            using (var conn = conn_factory.CreateConnection())
            {
                //2.创建信道
                using(IModel channel = conn.CreateModel())
                {
                    //3.声明交换器
                    channel.ExchangeDeclare(
                        "HelloExchange",    //交换器名称
                        ExchangeType.Direct,//交换器类型
                        true,              //是否持久话
                        false,              //是否自动删除
                        null                //关于交换器的详细设置,键值对形式
                        );
                    //4.声明队列
                    channel.QueueDeclare(
                        "HelloQueue",//队列名称
                        false,       //是否持久化
                        false,       //是否只对首次声明的队列可见
                        false,       //是否自动删除
                        null         ////关于队列和队列内消息的详细设置,键值对形式
                        );
                    //5.绑定交换器和队列
                    channel.QueueBind(
                        "HelloQueue",    //队列名
                        "HelloExchange", //交换器名
                        "hola"           //路由键
                        );
                    //6.获取消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (ch, ea) =>        //消费者消息接收处理事件
                    {
                        var body = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(body);
                        channel.BasicAck(ea.DeliveryTag, false); //确认接收消息,从队列中删除
                    };
                    //7.启动消费者
                    string consumer_tag = channel.BasicConsume(
                        "HelloQueue", //获取的队列名称
                        false,       //是否自动确认接收消息,从队列中删除
                        consumer     //消费者对象
                        );
                    channel.BasicCancel(consumer_tag);//调用消费
                    //var consumer = new QueueingBasicConsumer(channel);
                    //channel.BasicConsume("HelloQueue", false, consumer);
                    //while (true)
                    //{
                    //    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    //    var body = ea.Body;
                    //    var message = Encoding.UTF8.GetString(body);
                    //    Console.WriteLine("Received {0}", message);
                    //    channel.BasicAck(ea.DeliveryTag, false);
                    //}
                }
            }
            Console.ReadLine();
        }

  运行消费者后可以看到,以下结果:

原文地址:https://www.cnblogs.com/lonelyxmas/p/10693391.html

时间: 2024-10-12 14:09:56

RabbitMQ(二):理解消息通信RabbitMQ的相关文章

RabbitMQ详解(二)------消息通信的概念

RabbitMQ详解(二)------消息通信的概念 消息通信,有很多种,邮箱 qq 微信 短信等,这些通信方式都有发送者,接受者,还有一个中间存储离线消息的容器.但是这些通信方式和RabbitMQ的通信模型是不一样的,比如邮件,邮件服务器基于POP3/SMTP协议,通信双方需要明确指定,并且发送的邮件内容有固定的结构.而RabbitMQ服务器基于AMQP协议,这个协议是不需要明确指定发送方和接受方的,而且发送的消息也没有固定的结构,甚至可直接存储二进制数据,并且和邮件服务器一样,也能存储离线消

RabbitMQ实战:理解消息通信

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴.另外,短暂的陪产假就要结束了,小宝也二周了,下周二就要投入工作了,希望自己尽快调整过来,加油努力. 从本篇开始总结「RabbitMQ实战」系列的阅读笔记,RabbitMQ是一个开源的消息代理和队列服务器,可以通过基本协议在完全不同的应用之间共享数据,可以将作业排队以便让分布式服务进行处理. 本篇

RabbitMQ实战:消息通信模式和最佳实践

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 通过前2篇的介绍,了解了消息通信的主要元素和交互过程,以及如何运行和管理RabbitMQ,这篇将站在开发模式的角度理解「面向消息通信」带来的好处,以及在各种场景下的最佳实践. 通过介绍,你会了解到: 面向消息通信的好处 发后即忘模型 用RabbitMQ实现RPC 面向消息通信的好处 主要从异步状态思维.处理能力扩展性.集成复杂度方面,说明面向消息通信的好处. 异步状态思维 当将消息通信集成到应用程序时,开发模式将从同步模型

RabbitMQ介绍2 - 理解消息AMQP

理解消息AMQP通信.官方解释: http://www.rabbitmq.com/tutorials/amqp-concepts.html 概念:生产者producer,消费者consumer,队列queue,交换器exchange,路由键routing key,绑定键binding key. producer发布消息,消息经过交换器传播放入队列,消费者从队列中得到消息. ConnectionFactory, connection, channel信道.connectionFactory用来建立

.Net RabbitMQ之消息通信

1.消息投递服务 RabbitMQ是一种消息投递服务,怎么理解这句话呢?即RabbitMQ即不是消息的生产者,也是消息的消费者.他就像现实生活中快递模式,消费者在电商网站上下单买了一件商品,此时对应的生产者(商家)则生产了一件货物(概念上的生产,可能已经生产好了),接着生产者(商家)将货物发送给快递公司,因为消费者下单了这个货物,相当于订阅了这件货物,所以快递公司将会把这件货物发送给对应的消费者.RabbitMQ就相当于这里面的快递公司.服务在生产者和消费者之间建立桥梁,即通信. 2.Rabbi

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概

云计算openstack共享组件-消息队列rabbitmq(2)

一.MQ 全称为 Message Queue, 消息队列( MQ ) 是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.   消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.   排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.

消息队列RabbitMQ

消息队列RabbitMQ 一.RabbitMQ是什么? AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.J

消息队列 RabbitMQ 入门介绍

来源:http://ityen.com/archives/578 一.什么是RabbitMQ? RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:   例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在