RabbitMQ .NET Client 实战实验

  由于公司业务需求,最近想上RabbitMQ,之前我研究了一段时间微软的MSMQ。开源队列有很多,各有优劣。就先拿RabbitMQ练练手吧。本篇着重代码部分,至于怎么安装,怎么配置不在赘述。而且代码是在RabbitMQ.NET Client 类库基础上实现。

  假设阅读本文的人已经安装好RabbitMQ并且做了相应的用户配置。而且项目中已经从nuget安装了rabbitmq.client.dll.我们开始做一个简单的队列发送和接收消息。

  1. 将需要配置的东西放在配置文件里,例如主机地址,端口,用户名,密码等。
  2. 实现消息发送端:Product
  3. 实现消息接收端:Customer
  4. Demo测试

  将以下内容作为可配置部分放在配置文件中

<appSettings>
    <!--RabbitMQ-->
    <add key="RabbitMQ_HostUri" value="amqp://192.168.1.119:5672/"/>
    <add key="RabbitMQ_HostName" value="192.168.1.119"/>
    <add key="RabbitMQ_UserName" value="test_user"/>
    <add key="RabbitMQ_Password" value="123456"/>
    <add key="RabbitMQ_VirtualHost" value="ms_mq"/>
  </appSettings>

  由于只是对RabbitMQ.Client.dll中的又一次封装,所以代码不过多解释,其中要注意的就是某些配置问题,例如是否持久化,消息处理模式是怎么样的等等。

  首先我们创建一个连接工厂:

 public ConnectionFactory CreateFactory()
        {
            if (_factory == null) {

                const ushort heartbeat = 0;
                //主机地址
                Uri uri = new Uri(RabbitMQConfig.HostUri);

                _factory = new ConnectionFactory();
                //_factory.HostName = RabbitMQConfig.HostName;
                //用户名
                _factory.UserName = RabbitMQConfig.UserName;
                //密码
                _factory.Password = RabbitMQConfig.PassWord;
                //虚拟主机名
                _factory.VirtualHost = RabbitMQConfig.VirtualHost;
                //连接终端
                _factory.Endpoint = new AmqpTcpEndpoint(uri);

                _factory.RequestedHeartbeat = heartbeat;
                //自动重连
                _factory.AutomaticRecoveryEnabled = true;
            }
            return _factory;
        }

  一个简单的消息发布:(对代码研究不够透彻,只能一切从简~~)

 public void Publish(string message, string queueName=null)
        {
            if (queueName == null) {
                queueName = _queueName;
            }

            var factory = RabbitMQFactory.Instance.CreateFactory();
            using (var connection = factory.CreateConnection())
            {
                using (var model = connection.CreateModel())
                {
                    //消息持久化,防止丢失
                    model.QueueDeclare(queueName, RabbitMQConfig.IsDurable, false, false, null);
                    var properties = model.CreateBasicProperties();
                    properties.Persistent = RabbitMQConfig.IsDurable;
                    properties.DeliveryMode = 2;

                    //消息转换为二进制
                    var msgBody = Encoding.UTF8.GetBytes(message);
                    //消息发出到队列
                    model.BasicPublish("", queueName, properties, msgBody);
                }
            }
        }

  消息接收:

 public void Consume() {
            var factory = RabbitMQFactory.Instance.CreateFactory();

            var connection = factory.CreateConnection();

            connection.ConnectionShutdown += Connection_ConnectionShutdown;

            ListenChannel = connection.CreateModel();

            bool autoDeleteMessage = false;
            var queue = ListenChannel.QueueDeclare(_queueName, RabbitMQConfig.IsDurable, false, false, null);

            //公平分发,不要同一时间给一个工作者发送多于一个消息
            ListenChannel.BasicQos(0, 1, false);
            //创建事件驱动的消费者类型,不要用下边的死循环来消费消息
            var consumer = new EventingBasicConsumer(ListenChannel);
            consumer.Received += Consumer_Received;
            //消费消息
            ListenChannel.BasicConsume(_queueName, autoDeleteMessage, consumer);
        }

  我在Customer中定义了一个 ReceiveMessageCallback Func回调,这里就是当客户端从队列接收到消息之后,怎么处理由客户端来决定

  public Func<string, bool> ReceiveMessageCallback { get; set; }

  处理消息:

 private void Consumer_Received(object sender, BasicDeliverEventArgs args) {
            try {
                var body = args.Body;
                var message = Encoding.UTF8.GetString(body);
                //将消息业务处理交给外部业务
                bool result = ReceiveMessageCallback(message);
                if (result) {
                    if (ListenChannel != null && !ListenChannel.IsClosed) {
                        ListenChannel.BasicAck(args.DeliveryTag, false);
                    }
                }
                else {

                }

            }
            catch (Exception ex) {
                throw ex;
            }
        }

  基本代码已经完成,我们写一个测试,消息发送端:

    static void Main(string[] args)
        {
            var testQueueName = "test";
            IMessageProduct product = new MessageProduct(testQueueName);
            for (int i = 0; i < 10000; i++)
            {
                Console.WriteLine("正在发送第" + i + "条消息...");
                product.Publish("消息体" + i);
            }

            Console.Read();
        }

  消息接收端:(开多个口接收)

    static void Main(string[] args)
        {

            Parallel.For(0, RabbitMQConfig.ThreadCount, i =>
            {
                IMessageCustomer customer = new MessageCustomer("test");         //开始监听
                customer.StartListening();
                customer.ReceiveMessageCallback = message =>
                {            //客户端处理消息(打印)
                    Console.WriteLine("接收到消息:" + message);
                    return true;
                };
           });
            Console.Read();
        }

  打开发送消息端:

  打开消息接收端:

  到此为止,RabbitMQ队列的简单测试就完成了,没有介绍什么新知识,基本就是套DLL中的方法,不过也有很多不合理的地方,如果真正应用到项目中,还需要多加测试和修改。

  DEMO地址:https://github.com/fanpan26/RabbitMQ.NETClient

时间: 2024-11-07 22:16:52

RabbitMQ .NET Client 实战实验的相关文章

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr

【Docker篇四】Mesos+Zookeeper+Marathon+Docker实战实验

Apache Mesos概述 不同的分布式运算框架(spark,hadoop,ES,MPI,Cassandra,etc.)中的不同任务往往需要的资源(内存,CPU,网络IO等)不同,它们运行在同一个集群中,会相互干扰,为此,应该提供一种资源隔离机制避免任务之间由资源争用导致效率下降,考虑到资源利用率,运维成本,数据共享等因素,公司一般希望将所有这些框架部署到一个公共的集群中,让它们共享集群的资源,并对资源进行统一使用,这样,便诞生了资源统一管理与调度平台,典型的代表就是mesos. Apache

ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程

ActiveMQ 第01节:ActiveMQ入门和消息中间件第02节:JMS基本概念和模型第03节:JMS的可靠性机制第04节:JMS的API结构和开发步骤_rec_rec第05节:Broker的启动方式吖第06节:ActiveMQ结合Spring开发吖第07节:ActiveMQ支持的传输协议吖第08节:ActiveMQ消息存储持久化_rec_rec第09节:ActiveMQ的静态网络链接吖第10节:多线程consumer访问集群第11节:集群下的消息回流功能第12节:容错的链接和动态网络连接第

RabbitMQ实战应用技巧

1. RabbitMQ实战应用技巧 1.1. 前言 由于项目原因,之后会和RabbitMQ比较多的打交道,所以让我们来好好整理下RabbitMQ的应用实战技巧,尽量避免日后的采坑 1.2. 概述 RabbitMQ有几个重要的概念:虚拟主机,交换机,队列和绑定 虚拟主机:一个虚拟主机持有一组交换机.队列和绑定,我们可以从虚拟主机层面的颗粒度进行权限控制 交换机:Exchange用于转发消息,它并不存储消息,如果没有Queue队列绑定到Exchange,它会直接丢弃掉生产者发来的数据. 交换机还有个

RabbitMQ学习(二).NET Client之Work Queues

2 Work queues Distributing tasks among workers Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Work Queues (using the .NET Client) 前面已经介绍过了如何编写程序去发送消息到命名队列,以及从命名队列接收消息. 在这个部分我们将创建一个工作队列(Work Queue),用于将耗时任务(time-consuming tasks)分发给多个工作者(

SpringBoot整合RabbitMQ之典型应用场景实战三

实战前言RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用.微服务应用中充当着重要的角色.特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦.异步通信.高并发限流.超时业务.数据延迟处理等.前两篇博文我介绍分享了RabbitMQ在业务服务模块异步解耦以及通信的实战业务场景,感兴趣童鞋可以前往观看:1.http://blog.51cto.com/13877966/22970562.http://blog.51cto.com/13877966/2297182

Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送

摘要: 本篇博文是“Java秒杀系统实战系列文章”的第八篇,在这篇文章中我们将整合消息中间件RabbitMQ,包括添加依赖.加入配置信息以及自定义注入相关操作组件,比如RabbitTemplate等等,最终初步实现消息的发送和接收,并在下一篇章将其与邮件服务整合,实现“用户秒杀成功发送邮件通知消息”的功能! 内容: 对于消息中间件RabbitMQ,想必各位小伙伴没有用过.也该有听过,它是一款目前市面上应用相当广泛的消息中间件,可以实现消息异步通信.业务服务模块解耦.接口限流.消息分发等功能,在微

LVS集群实战

LVS Linux Virtual Server: Linux虚拟服务器 基于四层的LB ================================ LVS+keepalived Web Server 80/tcp LVS+keepalived Galera MySQL/MySQL Cluster 3306/tcp 一.LVS概述 LVS是Linux内核的一部分,因此性能较高 Linux虚拟服务器(即分发器或调度器)功能: 不真正提供服务,介接受客户的访问,为整个集群提供一个唯一的入口 虚

RabbitMQ教程C#版 - 发布订阅

先决条件本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们. 发布/订阅# (使用 .NET Client) 在 教程[2] 中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个 Worker.那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者.这种模式被称为“发布/订阅”. 为了说