[.NET] RabbitMQ 的行为艺术

RabbitMQ 的行为艺术

目录

  • 简介
  • 环境搭建
  • 示例一:简单的 Hello World
  • 示例二:发布/订阅模式
  • 尝试发现 - 新物种 EasyNetQ

简介

  RabbitMQ:一个消息系统,基于 AMQP 系统协议,由 Erlang 语言开发。

  优点:健壮、使用简单、开源和支持各种流行的语言(如 Python、java、.NET)等。

  MQ(Message Queue):消息队列的简称,是一种应用程序之间的通信机制。

  作用:将部分无需立即回调获取结果,并且耗时的操作,使用异步处理的方式提高服务器的吞吐量及性能。如:日志记录。

图:简单的通信方式,及加入 MQ 后的变化

A 端:生产者将消息写(插)入队列;

MQ(队列) :中间件,消息的载体;

B 端:消费者从队列读(取)出消息。

  MQ 特点:消费者 - 生产者模型的一种表现形式。

环境搭建

  1.官网下载安装包:http://www.rabbitmq.com/ ;

  2.安装时会提示你下载 Erlang 语言环境;

  3.启动安装完的服务:RabbitMQ;

  4.在 cmd 中指向 sbin 目录,并输入以下命令,才能打开 WEB 管理界面:

rabbitmq-plugins enable rabbitmq_management

  5.默认 url:http://localhost:15672/#/

示例一:简单的 Hello World

  P(Producer):生产者,意味着发送;

  Queue:队列,本质上是一个无限的缓冲区,可以储存尽可能多的信息;

  C(Consumer):消费者,等待并接收消息。

  【备注】生产者和消费者不需要驻留在同一台服务器上。

  Producer.cs

 1     public class Producer  
 2     {
 3         public static void Send()
 4         {
 5             var factory = new ConnectionFactory { HostName = "localhost" };
 6
 7             //创建连接对象,基于 Socket
 8             using (var connection = factory.CreateConnection())
 9             {
10                 //创建新的渠道、会话
11                 using (var channel = connection.CreateModel())
12                 {
13                     //声明队列
14                     channel.QueueDeclare(queue: "hello",    //队列名
15                         durable: false,     //持久性
16                         exclusive: false,   //排他性
17                         autoDelete: false,  //自动删除
18                         arguments: null);
19
20                     const string message = "Hello World!";
21                     var body = Encoding.UTF8.GetBytes(message);
22
23                     channel.BasicPublish(exchange: "",  //交换机名
24                         routingKey: "hello",    //路由键
25                         basicProperties: null,
26                         body: body);
27                 }
28             }
29         }
30     }

  【备注】队列名如果已存在,将不会重复创建。假设队列已存在,修改 channel.QueueDeclare() 方法内的参数后启动会出现异常。

  【备注】消息内容是一个字节数组。

  Consumer.cs

 1     class Consumer
 2     {
 3         public static void Receive()
 4         {
 5             var factory = new ConnectionFactory() { HostName = "localhost" };
 6
 7             using (var connection = factory.CreateConnection())
 8             {
 9                 using (var channel = connection.CreateModel())
10                 {
11                     channel.QueueDeclare(queue: "hello",
12                                          durable: false,
13                                          exclusive: false,
14                                          autoDelete: false,
15                                          arguments: null);
16
17                     //创建基于该队列的消费者,绑定事件
18                     var consumer = new EventingBasicConsumer(channel);
19                     consumer.Received += (model, ea) =>
20                     {
21                         var body = ea.Body;     //消息主体
22                         var message = Encoding.UTF8.GetString(body);
23                         Console.WriteLine(" [x] Received {0}", message);
24                     };
25
26                     //启动消费者
27                     channel.BasicConsume(queue: "hello",    //队列名
28                                          noAck: true,   //false:手动应答;true:自动应答
29                                          consumer: consumer);
30
31                     Console.Read();
32                 }
33             }
34         }
35     }

  【疑问】在消费者的类里面为什么会再次声明队列(channel.QueueDeclare())呢?-- 因为接收方可能会在发送方启动前启动,这是出于保险起见。

示例二:发布/订阅模式

  1.Exchange 交换机和 Exchange Type 交换类型  

  RabbitMQ 消息传递模型的核心思想是,生产者不会直接将消息发给队列。

  这里我们将引入新的名词 Exchange(交换机)。交换机传递消息的类型也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。

图:Direct

图:Fanout

图:Topic

--上述 3 张图来源:http://m.blog.csdn.net/article/details?id=52262850

  

  这里,创建一个名为 “logs” 的交换机,它的类型为广播类型(fanout:可以将收到的所有消息,广播给所有已知的队列)。

channel.ExchangeDeclare(exchange: "logs",   //交换机名
                        type: "fanout");    //交换类型

  

  2.临时队列

  作为消费者,我们有时候只需要一些新的(或者空的)队列,此时,更好的方式就是让它自动生成一个随机名字的队列;其次,当队列连接中断时会选择自动删除对应的消费者。

  创建一个非持久,有排他性和自动删除特性的队列(无参时)。

var queueName = channel.QueueDeclare().QueueName;

  3.Binding 绑定

  【疑问】有了 Exchange 和 channel,这时,还需要什么东西呢?-- 我们要创建 Exchange 和 channel 关系的桥梁,这个桥梁称之为 Binding(绑定)。

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

 1     class Producer
 2     {
 3         public static void Send()
 4         {
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = "localhost",
 8                 Port = 5672,
 9                 UserName = "guest",
10                 Password = "guest"
11             };
12
13             using (var connection = factory.CreateConnection())
14             {
15                 using (var channel = connection.CreateModel())
16                 {
17                     channel.ExchangeDeclare(exchange: "logs",   //交换机名
18                         type: "fanout");    //交换类型
19
20                     // Guid
21                     var message = Guid.NewGuid().ToString();
22                     var body = Encoding.UTF8.GetBytes(message);
23                     channel.BasicPublish(exchange: "logs",
24                                          routingKey: "",
25                                          basicProperties: null,
26                                          body: body);
27
28                     Console.WriteLine(" [x] Sent {0}", message);
29                 }
30
31                 Console.WriteLine(" Press [enter] to exit.");
32                 Console.ReadLine();
33             }
34         }
35     }

Producer.cs //生产者

 1     class Reciver
 2     {
 3         public static void Recive()
 4         {
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = "localhost",
 8                 Port = 5672,
 9                 UserName = "guest",
10                 Password = "guest"
11             };
12
13             using (var connection = factory.CreateConnection())
14             using (var channel = connection.CreateModel())
15             {
16                 channel.ExchangeDeclare(exchange: "wen_logs",   //交换机名
17                     type: "fanout");    //交换类型
18
19                 //创建队列
20                 var queueName = channel.QueueDeclare().QueueName;
21                 channel.QueueBind(queue: queueName,
22                                   exchange: "wen_logs",
23                                   routingKey: "");
24
25                 Console.WriteLine(" [*] Waiting for logs.");
26
27                 var consumer = new EventingBasicConsumer(channel);
28                 consumer.Received += (model, ea) =>
29                 {
30                     var body = ea.Body;
31                     var message = Encoding.UTF8.GetString(body);
32                     Console.WriteLine(" [x] {0}", message);
33                 };
34                 channel.BasicConsume(queue: queueName,
35                                      noAck: true,
36                                      consumer: consumer);
37
38                 Console.WriteLine(" Press [enter] to exit.");
39                 Console.ReadLine();
40             }
41         }
42     }

Reciver.cs //接收者

尝试发现 - 新物种 EasyNetQ

  这都不是事!EasyNetQ,看名字就知道,搞定 MQ,So easy!

  连接 RabbitMQ 代理:

var bus = RabbitHutch.CreateBus("host=localhost");

  发布:

bus.Publish(message);

  订阅:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

  下面我们通过 Demo 来感受一下 Easy 的程度吧,创建项目(效果图如下,附 Demo 下载):

  Wen.EasyNetQDemo.Model:类库

  Wen.EasyNetQDemo.Publisher,Wen.EasyNetQDemo.Subscriber:控制台应用程序,都使用 Nuget 直接安装 EasyNetQ 包,都引用类库 Model。

  Demo.cs

    public class Demo
    {
        public string Message { get; set; }
    }

  Publisher

 1 using System;
 2 using EasyNetQ;
 3 using Wen.EasyNetQDemo.Model;
 4
 5 namespace Wen.EasyNetQDemo.Publisher
 6 {
 7     internal class Program
 8     {
 9         private static void Main(string[] args)
10         {
11             using (var bus = RabbitHutch.CreateBus("host=localhost"))
12             {
13                 string input;
14                 Console.WriteLine("请输入信息。 如果是“esc” 将退出当前窗口。");
15
16                 while ((input = Console.ReadLine()) != "esc")
17                 {
18                     bus.Publish(new Demo
19                     {
20                         Message = input
21                     });
22                 }
23
24             }
25         }
26     }
27 }

  【备注】RabbitHutch.CreateBus() 方法可以创建一个简单的发布/订阅和包含请求/响应 API 的消息总线。

  Subscriber

 1 using System;
 2 using EasyNetQ;
 3 using Wen.EasyNetQDemo.Model;
 4
 5 namespace Wen.EasyNetQDemo.Subscriber
 6 {
 7     internal class Program
 8     {
 9         private static void Main(string[] args)
10         {
11             using (var bus = RabbitHutch.CreateBus("host=localhost"))
12             {
13                 bus.Subscribe<Demo>("test", HandleDemo);
14
15                 Console.WriteLine("监听信息中...输入“return”将退出当前窗口!");
16                 Console.ReadLine();
17             }
18         }
19
20         private static void HandleDemo(Demo demo)
21         {
22             Console.ForegroundColor = ConsoleColor.Green;
23             Console.WriteLine($"Got message: {demo.Message}");
24             Console.ResetColor();
25         }
26     }
27 }

图:效果图

时间: 2024-11-08 20:26:33

[.NET] RabbitMQ 的行为艺术的相关文章

Redis 小白指南(三)- 事务、过期、消息通知、管道和优化内存空间

Redis 小白指南(三)- 事务.过期.消息通知.管道和优化内存空间 简介 <Redis 小白指南(一)- 简介.安装.GUI 和 C# 驱动介绍> 讲的是 Redis 的介绍,以及如何在 Windows 上安装并使用,一些 GUI 工具和自己简单封装的 RedisHelper. <Redis 小白指南(二)- 聊聊五大类型:字符串.散列.列表.集合和有序集合>讲的是 Redis 中最核心的内容,最常用的就是和数据类型打交道. 目录 事务 过期时间 消息通知 管道 优化内存空间

windows上部署rabbitmq遇到的一些问题及解决方法

在目前这家公司,刚进公司的时候接手了一个服务,算是个比较完备的服务,其中几台电脑之间通信用到了rabbitmq,一开始没出什么问题,然后后来勒索病毒wanner cry来的时候,系服把所有服务器装了一个什么杀毒软件,重启之后rabibtmq集群就出现了一些问题,经过一番学习,把这些问题都搞定了,现在做一个总结. 一开始,我按照官网的描述,把四台服务器加入了一个集群,但是不知道为什么,除了主节点外,另外三台都看不了集群状态,由于并不影响什么,就先放在那没管,其实想起来,是因为之前集群的配置文件没删

Spring rabbitMq 中 correlationId或CorrelationIdString 消费者获取为null的问题

问题 在用Spring boot 的 spring-boot-starter-amqp   快速启动 rabbitMq 是遇到了个坑 消费者端获取不到:correlationId或CorrelationIdString 问题产生的原因 correlationId 的在 spring rabbitmq 2.0 以后 byte方式会被放弃,所以 目前 代码中有些地方没有改过来,应该算一个BUG @SuppressWarnings("deprecation") public class De

在Node.js中使用RabbitMQ系列二 任务队列

在上一篇文章在Node.js中使用RabbitMQ系列一 Hello world我有使用一个任务队列,不过当时的场景是将消息发送给一个消费者,本篇文章我将讨论有多个消费者的场景. 其实,任务队列最核心解决的问题是避免立即处理那些耗时的任务,也就是避免请求-响应的这种同步模式.取而代之的是我们通过调度算法,让这些耗时的任务之后再执行,也就是采用异步的模式.我们需要将一条消息封装成一个任务,并且将它添加到任务队列里面.后台会运行多个工作进程(worker process),通过调度算法,将队列里的任

RabbitMQ 很成熟 不是阿里的

简介 官网 http://www.rabbitmq.com RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现 RabbitMQ实现了AMQP标准 安装 参考 http://www.cnblogs.com/LipeiNet/p/5973061.html 安装 erlang 有安装C运行库,给 Erlang 用的  配置环境变量 ERLANG_HOME C:\Program Files\erl8.3 安装 MQ服务器软件 3.6.9  配

2017.4.7------软件测试的艺术+整理以前的摘记

2017.4.17 以下内容来自<软件测试的艺术> 第1页--20页.供自己学习使用.   第一章 软件测试:就是一个过程或一个系列过程,用来确认计算机代码完成了其应该完成的功能,不执行其不该有的操作. 第二章    测试人员需要有正确的态度.每当测试一个程序时,应当想到的是为程序增加一些价值.通过测试来增加程序的价值,是指测试提高了程序的可靠性或质量,提高程序可靠性,是指找出并最终修改了程序的错误. 1.有人把没发现错误的测试用例称为一次"成功的测试",而将发现了某个新错

杂项之rabbitmq

杂项之rabbitmq 本节内容 rabbitmq简介 AMQP协议 rabbitmq使用 应用举例 rabbitmq简介 介绍rabbitmq之前,先介绍一下AMQP协议,因为rabbitmq就是基于AMQP协议实现的一个服务程序.(目前为止应该也是唯一实现了AMQP协议的服务) AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信. arbbitmq使用erlan

RabbitMQ安装和使用(和Spring集成)

一.安装Rabbit MQ Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang.通过下面两个连接下载安装3.2.3 版本: 下载并安装 Eralng OTP For Windows (vR16B03) 运行安装 Rabbit MQ Server Windows Installer (v3.2.3) 具体操作步骤参考:在 Windows 上安装Rabbit MQ 指南 本人遇到的问题 当安装RabbitMQ后,使用rabbitmqctl

rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较

Kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲.异步通信.汇集日志.系统解耦等方面.相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能. 针对Kafka性能方面进行简单分析,相关数据请参考:https://segmentfault.com/a/1190000003985468,下面介绍一下Kafka的架构和涉及到的名词: Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Parti