RabbitMQ学习系列(三): C# 如何使用 RabbitMQ

  上一篇已经讲了Rabbitmq如何在Windows平台安装,还不了解如何安装的朋友,请看我前面几篇文章:RabbitMQ学习系列一:windows下安装RabbitMQ服务 , 今天就来聊聊 C# 实际开发的过程中,怎么调用 用RabbitMQ。

  一、客户端 

    RabbitMQ 有很多客户端API,都非常的好用。我们在一边,一直用的都是 EasyNetQ,所以这里的 demo 只介绍 EasyNetQ 客户端实现。其他的客户端,大家自己去研究吧。

    EasyNetQ 是一个易于使用的RabbitMQ的.Net客户端API。地址:http://easynetq.com/ 。   demo示例下载 。

  二、项目结构

    

      说明:前面我们提到过,RabbitMQ由 Producer(生成者) 和 Consumer(消费者) 两部分组成。Weiz.Consumer 就是Consumer(消费者),Weiz. Producer 为 Producer(生成者),Weiz.MQ 为消息队列的通用处理类库。

  三、项目搭建

    1. Weiz.MQ 项目,消息队列的通用处理类库,用于正在的订阅和发布消息。

      1. 通过nuget安装项目EasyNetQ 相关组件, (略)

      2. 增加BusBuilder.cs管道创建类,主要负责链接Rabbitmq。

using System;
using System.Configuration;
using EasyNetQ;

namespace Weiz.MQ
{
    /// <summary>
    /// 消息服务器连接器
    /// </summary>
    public class BusBuilder
    {
        public static IBus CreateMessageBus()
        {
            // 消息服务器连接字符串
            // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];
            string connString = "host=192.168.98.107:5672;virtualHost=OrderQueue;username=zhangweizhong;password=weizhong1988";
            if (connString == null || connString == string.Empty)
            {
                throw new Exception("messageserver connection string is missing or empty");
            }

            return RabbitHutch.CreateBus(connString);
        }
    }
}

      3. 增加IProcessMessage类,定义了一个消息方法,用于消息传递

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Weiz.MQ
{
    public interface IProcessMessage
    {
        void ProcessMsg(Message msg);
    }
}

      4. 增加Message类,定义了消息传递的实体属性字段等信息

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Weiz.MQ
{
    public class Message
    {
        public string MessageID { get; set; }

        public string MessageTitle { get; set; }

        public string MessageBody { get; set; }

        public string MessageRouter { get; set; }
    }
}

      5. 增加MQHelper类,用于正在的订阅和发布消息。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Configuration;

using EasyNetQ;

namespace Weiz.MQ
{
    public class MQHelper
    {
        /// <summary>
        /// 发送消息
        /// </summary>
        public static void Publish(Message msg)
        {
            //// 创建消息bus
            IBus bus = BusBuilder.CreateMessageBus();

            try
            {
                bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常
            }

            bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="msg"></param>
        public static void Subscribe(Message msg, IProcessMessage ipro)
        {
            //// 创建消息bus
            IBus bus = BusBuilder.CreateMessageBus();

            try
            {
                bus.Subscribe<Message>(msg.MessageRouter, message => ipro.ProcessMsg(message), x => x.WithTopic(msg.MessageRouter));

            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常
            }
        }
    }
}

    2. RabbitMQ由 Producer(生成者)

      1. 创建一个aspx 页面,增加如下代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using Weiz.MQ;

namespace Weiz.Producer
{
    public partial class TestMQ : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {

        }

        protected void Button1_Click(object sender, EventArgs e)
        {
            Message msg = new Message();
            msg.MessageID = "1";
            msg.MessageBody = DateTime.Now.ToString();
            msg.MessageTitle = "1";
            msg.MessageRouter = "pcm.notice.zhangsan";
            MQHelper.Publish(msg);

        }
    }
}

    3. Weiz.Consumer 就是Consumer(消费者)

      1 . 新增OrderProcessMessage.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;

namespace Weiz.Consumer
{
    public class OrderProcessMessage:MQ.IProcessMessage
    {
        public void ProcessMsg(MQ.Message msg)
        {
            Console.WriteLine(msg.MessageBody);
        }
    }
}

      2. Program 增加如下代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Weiz.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            OrderProcessMessage order = new OrderProcessMessage();
            MQ.Message msg = new MQ.Message();
            msg.MessageID = "1";
            msg.MessageRouter = "pcm.notice.zhangsan";

            MQ.MQHelper.Subscribe(msg, order);
        }
    }
}

  四、运行

    1. 启动 Weiz.Consumer (消费者),启动消费者,会自动在RabbitMQ 服务器上创建相关的exchange 和 queue 。

      

      Consumer 消费者,使用的是Subscribe (订阅)的模式,所以,Weiz.Consumer客户端启动后,会自动创建connection,生成相关的exchange 和queue。

    2. 启动Weiz. Producer 里的TestMQ.aspx 页面,往队列里面写一条消息。订阅的消费者立马就能拿到这条消息。

       

 

  至此,C#向Rabbitmq消息队列发送消息已经简单完成。

  查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

时间: 2024-10-11 17:03:30

RabbitMQ学习系列(三): C# 如何使用 RabbitMQ的相关文章

RabbitMQ学习系列(四): 几种Exchange 模式

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ.今天说些理论的东西,Exchange 的几种模式. AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相

RabbitMQ学习(三)订阅/发布

RabbitMQ学习(三)订阅/发布 1.RabbitMQ模型 前面所学都只用到了生产者.队列.消费者.如上图所示,其实生产者并不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机. 有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout) 这里我们使用扇形交换机做一个简单的广播模型:一个生产者和多个消费者接受相同消息

RabbitMQ学习(三).NET Client之Publish/Subscribe

3 Publish/Subscribe Sending messages to many consumers at once Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Publish/Subscribe (using the .NET Client) 前面的教程我们已经学习了如何创建工作队列,工作队列背后的假设是每一个任务都被准确地递送给一个worker进行处理.这里我们将介绍完全不同的模式,即一个消息可以递送给多

RabbitMQ学习系列(一): 介绍

1. 介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官网:http://www.rabbitmq.com 2. AMQP AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存

rabbitMQ学习笔记(三) 消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者   一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer

Identity Server4学习系列三

1.简介 在Identity Server4学习系列一和Identity Server4学习系列二之令牌(Token)的概念的基础上,了解了Identity Server4的由来,以及令牌的相关知识,本文开始实战,实现Identity Server4基本的功能. 2.前提 本文基于.Net Core2.1和Indetity Server4 2.3.0,令牌处理包采用IdentityServer4.AccessTokenValidation 2.7.0 3.实战一Identity Server4服

C# Redis学习系列三:Redis配置主从

Redis配置主从 主IP :端口      192.168.0.103 6666 从IP:端口       192.168.0.108 3333 配置从库 (1)安装服务: redis-server --service-install --service-name redisService6666 --port 6666 (2)启动进程: redis-server --service-start --service-name redisService6666 (3)连接redis:redis-

rabbitmq学习(三) —— 工作队列

工作队列,又称任务队列,主要思想是避免立即执行资源密集型任务,并且必须等待完成.相反地,我们进行任务调度,我们将一个任务封装成一个消息,并将其发送到队列.工作进行在后台运行不断的从队列中取出任务然后执行.当你运行了多个工作进程时,这些任务队列中的任务将会被工作进程共享执行. 这个概念在 Web 应用程序中特别有用,在短时间 HTTP 请求内需要执行复杂的任务. 准备工作 现在,假装我们很忙,我们使用 Thread.sleep 来模拟耗时的任务. 发送端 public class NewTask

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处