RabbitMQ原理与相关操作(一)

小编是菜鸟一枚,最近想试试MQ相关的技术,所以自己看了下RabbitMQ官网,试着写下自己的理解与操作的过程。

刚开始的第一篇,原理只介绍 生产者、消费者、队列,至于其他的内容,会在后续中陆续补齐。

引入MQ话题

什么时候会用到MQ

可能很多人有疑惑:MQ到底是什么?哪些场景下要使用MQ?
前段时间安装了RabbitMQ,现在就记录下自己的学习心得吧。
首先看段程序:

class Program
    {
        static void Main(string[] args)
        {
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
        }

        public static void WriteLog(int i)
        {
            using (FileStream f = new FileStream(@"d:\\test.txt", FileMode.Append))
            {
                using (StreamWriter sw = new StreamWriter(f, Encoding.Default))
                {
                    sw.Write(i);
                }
            }
        }

        public static void Write()
        {
            for (int i = 0; i < 10000; i++)
            {
                WriteLog(i);
            }
        }
    }

仅仅从代码上看,没有觉得任何问题对吧?编译也是通过的,但是执行时,出现一个问题:

当然,这仅仅是一个小的案例,类似这种多线程写文件造成的问题, 就应该使用MQ了。

MQ的使用场景大概包括解耦,提高峰值处理能力,送达和排序保证,缓冲等。

MQ概述

消息队列技术是分布式应用间交换信息的一种技术。

消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。

通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

MQ主要作用是接受和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员。

MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息。

RabbitMQ术语

生产者

消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫做生产者,用P表示

消费者:

生产者“生产”出消息后,最终由谁消费呢?等待接受消息的应用程序,我们称之为消费者(Consuming ),用C表示

队列:

消息只能存储在队列(queue )中。尽管消息在rabbitMQ和应用程序间流通,但是队列却是存在于RabbitMQ内部。

一个队列不受任何限制,它可以存储你想要存储的消息量,它本质上是一个无限的缓冲区。

多个生产者可以向同一个队列发送消息,多个消费者可以尝试从同一个消息队列中接收数据。

一个队列像下面这样(上面是它的队列名称)

注意:

生产者、消费者、中间件不必在一台机器上,实际应用中也是绝大多数不在一起的。我们可以用一张图表示RabbitMQ的构造:

注:此图片摘自于百度百科RabbitMQ

使用RabbitMQ解决多线程写入文件问题

分析

多线程写入,产生消息的也就是一个程序(一个生产者P),消费消息的也是一个消息,它的模型应该是:

编写代码

引入RabbitMQ client DLL

程序包管理控制台命令:

PM> Install-Package RabbitMQ.Client

生产者

首先,创建一个 connection 通过socket连接 去和服务器连接起来(需要传目的服务器的IP、用户名、密码等)。

接着 创建一个 channel ,这是大部分的要做的事情所在。

要发送消息,我们必须声明一个队列,,然后我们可以向队列发布消息。

执行一次BasicPublish方法,推送一个消息。

class Program
    {
        static void Main(string[] args)
        {
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
        }

        public static void Write()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);
                for (int i = 0; i < 8000; i++)
                {
                    string message = i.ToString();
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
                    Console.WriteLine("Program Sent {0}", message);
                }
            }
        }
    }

声明的队列,在服务器中如果不存在了,会自动创建。而消息的内容是字节数组,在使用时,注意编码问题。

消费者

当队列里有消息时,消费者要随时能够从队列里获取消息,所以我需要一直运行它,让它监听消息。

就像我们打篮球进行传球,需要事先确认要传给的那个队友位置一样,生产者要发送消息,一定要事先知道消费消息的程序的对列是哪个。所以,在运行生产者程序前,需要先启动消费者程序。

由此,声明对列,就应该在消费者程序中完成。

 

执行程序

先执行 消费者程序,让它一直保持监听。

错误解决

执行时VS报错:

“RabbitMQ.Client.Exceptions.BrokerUnreachableException”类型的未经处理的异常在 RabbitMQ.Client.dll 中发生 其他信息: None of the specified endpoints were reachable。

进入查看详细的内部异常:

innerEception:{"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=\"NOT_ALLOWED - access to vhost ‘/‘ refused for user ‘eric‘\", classId=10, methodId=40, cause="}

此时,我们打开在http://localhost:15672/#/users 可以看到eric 下 的Can access virtual hosts 为 NoAccess

解决办法:

rabbitmqctl控制台输入

rabbitmqctl set_permissions -p / userName "." "." ".*"

再次执行时,可以看到:

然后运行 生产者程序。

我们先开着 Receive ,当生产者运行时

消费者的自动触发执行 :

直到所有的 指定的 queue 里面的消息完全消费完为止。(此时消费者程序仍然在监听中)

对于需要安装和设置用户的同学,请参考 windows下 安装 rabbitMQ 及操作常用命令

原文地址:https://www.cnblogs.com/ExMan/p/10263898.html

时间: 2024-10-15 07:20:51

RabbitMQ原理与相关操作(一)的相关文章

RabbitMQ原理与相关操作(二)

接着 上篇随笔 增加几个概念: RabbitMQ是一个在AMQP(高级消息队列协议)标准基础上完整的,可服用的企业消息系统. AMQP模型的功能组件图(上图摘自 Sophia_tj 的 第2章 AMQP模型) AMQP的四个总要概念: 1.虚拟主机(virtual host)或(vhost) 2.交换机(exchange) 3.队列(queue) 4.绑定器(bind) 什么是虚拟主机?一组交换机.队列和绑定器 被称为 虚拟主机(vhost). 为什么要用虚拟主机?RabbitMQ server

RabbitMQ原理与相关操作(三)消息持久化

现在聊一下RabbitMQ消息持久化: 问题及方案描述 1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间.在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了. 这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情. 2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的.当RabbitMQ死掉了或者重启了,上次创建的队列.消息都不会保存. 这种情况可以使用RabbitMQ提供的消

HBase的工作原理和相关操作

学习一个开源软件的基本思路都是(1)安装和配置(2)理解工作原理(3)命令操作(4)代码操作(5)研究源码(6)根据论文或需求进行二次开发.同样,学习HBase也不例外,但我省去了HBase集群(4台)的安装和配置,主要总结HBase的工作原理,Shell命令操作,Java代码操作相关内容. 一. HBase存储结构 1. Client 解析: 对于管理类的操作,Client与HMaster进行RPC,对于数据读写类的操作,Client与HRegionServer进行RPC. 2. ZooKee

Hive的工作原理和相关操作

Hive是一个基于Hadoop的数据仓库,提供比较完整的SQL功能(本质还是将SQL转换为MapReduce),可供数据分析师利用Hadoop对海量数据进行分析.但是,Hive也有自身的缺点,比如比较慢等. 一. Hive的工作原理和结构 二. Hive的基本操作 1. 表操作 2. 视图操作 3. 索引操作 4. 分区操作 5. 桶操作 三. 使用JDBC开发Hive程序

python装饰器原理及相关操作

python装饰器,简单的说就是用于操作底层代码的代码,在不改变底层代码函数的情况下对底层代码进行验证操作等 首先,必须知,道调用func和func的区别,分别为返回函数所在的内存地址和调用该函数,输出执行结果,例如: def func(): print("欢迎光临!!!") print("返回函数所在的内存地址:",func) func() 列举一个简单的web页面调用例子 1 #做登录验证 2 def login(func): 3 print("登录成

指针、链表的原理和各类操作相关心得以及学生信息管理系统

伴随着学期末的到来,C语言程序设计这门课也接近尾声.经过前两次的教学,我们对C语言也有了深刻的了解,学习的内容也不断的加深.这次我们就学习了C语言程序设计里应用最广泛,也是最难学习的知识--链表和指针的应用. 关于指针和链表这两个的应用和上次的管理系统有着直接的关系,没有添加链表和指针的管理系统无法做到精确的查找.数据存储方面也显得不方便.所以指针和链表的作用能够直接指向你所需要的数据地址,使程序更加完善.这次我就利用指针的应用制作了一个管理员工工资等信息的程序. §1 指向结构体变量的指针变量

centos7的selinux的原理及相关配置

centos7的selinux的原理及相关配置 SELinux的全称是Security Enhanced Linux, 就是安全加强的Linux.在SELinux之前,root账号能够任意的访问所有文档和服务:如果某个文件设为777,那么任何用户都可以访问甚至删除:这种方式称为DAC(主动访问机制),很不安全. DAC 自主访问控制: 用户根据自己的文件权限来决定对文件的操作,也就是依据文件的own,group,other/r,w,x权限进行限制.Root有最高权限无法限制.r,w,x权限划分太

【Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之RAC 工作原理和相关组件(三)

RAC 工作原理和相关组件(三) 概述:写下本文档的初衷和动力,来源于上篇的<oracle基本操作手册>.oracle基本操作手册是作者研一假期对oracle基础知识学习的汇总.然后形成体系的总结,一则进行回顾复习,另则便于查询使用.本图文文档亦源于此.阅读Oracle RAC安装与使用教程前,笔者先对这篇文章整体构思和形成进行梳理.由于阅读者知识储备层次不同,我将从Oracle RAC安装前的准备与规划开始进行整体介绍安装部署Oracle RAC.始于唐博士指导,对数据库集群进行配置安装,前

关于C#资源文件的相关操作

关于资源文件的相关操作. //1.比较常见的有获取资源文件对应的文件流,然后转换到相对应的文件 //比较典型的做法是通过代码程序集加载指定资源 //如下通过Assembly的静态方法GetExecutingAssembly()得到程序集 //还有很多方式可以得到代码程序集 System.Reflection.Assembly asm = System.Reflection.Assembly.GetExecutingAssembly(); Stream manifestResourceStream