RabbitMQ与.net core(二)Producer与Exchange

原文:RabbitMQ与.net core(二)Producer与Exchange

Producer:消息的生产者,也就是创建消息的对象

Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。

1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包

2.创建Exchange

using RabbitMQ.Client;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //创建Exchange

                }
            }
        }
    }
}

可以看到Echange的参数有:

type:可选项为,fanout,direct,topic,headers。区别如下:

    fanout:发送到所有与当前Exchange绑定的Queue中

    direct:发送到与消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:发送到与消息的header属性相同的Queue中

durable:持久化

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

运行程序,可以在可视化界面看到change2

接下来我们可以创建与change2绑定的queue

3.创建Queue

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #创建queue2
                    channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2
                }

可以看到Echange的参数有:

durable:持久化

exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

去可视化界面看Queue

4.发送消息

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; #持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                }

5.消费消息

using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);
                while (true)
                {
                    var message = channel.BasicGet(queue, true);  #第二个参数说明自动释放消息,如为false需手动释放消息
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }
            }
        }
    }
}

运行查看结果

查看可视化界面

6.手动释放消息

                while (true)
                {
                    var message = channel.BasicGet(queue, false);#设置为手动释放
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    channel.BasicAck(message.DeliveryTag, false); #手动释放
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

我们再发一条消息,然后开始消费,加个断点调试一下

查看一下Queue中消息状态

然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态

这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态

如图已经被释放了

7.让失败的消息回到队列中

                while (true)
                {
                    var message = channel.BasicGet(queue, false);
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        Console.WriteLine(message.DeliveryTag);    #当前消息被处理的次序数
                        if (1==1)
                            channel.BasicReject(message.DeliveryTag, true);
                    }

                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

重新发送4条消息

开始消费

我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾

8.监听消息

 using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }

原文地址:https://www.cnblogs.com/lonelyxmas/p/10237139.html

时间: 2024-11-05 20:40:50

RabbitMQ与.net core(二)Producer与Exchange的相关文章

RabbitMQ详解(二)------消息通信的概念

RabbitMQ详解(二)------消息通信的概念 消息通信,有很多种,邮箱 qq 微信 短信等,这些通信方式都有发送者,接受者,还有一个中间存储离线消息的容器.但是这些通信方式和RabbitMQ的通信模型是不一样的,比如邮件,邮件服务器基于POP3/SMTP协议,通信双方需要明确指定,并且发送的邮件内容有固定的结构.而RabbitMQ服务器基于AMQP协议,这个协议是不需要明确指定发送方和接受方的,而且发送的消息也没有固定的结构,甚至可直接存储二进制数据,并且和邮件服务器一样,也能存储离线消

RabbitMQ与.net core(一)安装

原文:RabbitMQ与.net core(一)安装 一.安装Erlang环境 前提:erlang版本与rabbitmq版本需按照要求对应!!! 1.在安装erlang之前先安装下依赖文件(这一步不要忘掉了, 不然后面./configure的时候要报错): [[email protected] local]# yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto 2.到erlang官网去下载erlang安装包 官网地

在WinSrv 2016 Core模式下安装Exchange Server 2019预览版

之前的Exchange Server很多版本大家都很熟知,但每次安装Exchange Server都是必须安装在具备GUI图形化界面的Windows Server上的,很多人觉得Windows Server的GUI图形化很占资源并且容易导致人为的误操作而影响系统的稳定性,那么在未来发布的Exchange Server 2019版本中会是第一个支持在Windows Server Core模式下安装的Exchange Server,这样就大大提高了Windows Server对Exchange Se

RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

1.topic类型的Exchange 我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式: 1. 使用*来匹配一个单词 2.使用#来匹配0个或多个单词 我们来看代码 消费端 using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using S

rabbitMQ(二):Fanout Exchange

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上. 1.可以理解为路由表的模式 2.这种模式不需要RouteKey 3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定. 4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃. 原文地址:https://www.cnblogs.com/dwxblogs/p

rabbitmq使用方法(二)

Work Queues In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers. The main idea behind Work Queues (

RabbitMQ消息队列(二):”Hello, World“

本文将使用Python(pika 0.9.8)实现从Producer到Consumer传递数据"Hello, World". 首先复习一下上篇所学:RabbitMQ实现了AMQP定义的消息队列.它实现的功能"非常简单":从Producer接收数据然后传递到Consumer.它能保证多并发,数据安全传递,可扩展. 和任何的Hello world一样,它们都不复杂.我们将会设计两个程序,一个发送Hello world,另一个接收这个数据并且打印到屏幕.      整体的

RabbitMQ消息队列(二):"Hello, World"[转]

2. Sending 第一个program send.cs:发送Hello world 到queue.正如我们在上篇文章提到的,你程序的第9行就是建立连接,第12行就是创建channel,第14行创建名字为hello的queue. 1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class Send 6 { 7 public static void Main() 8 { 9 var factory = new C

RabbitMQ第三课 基本概念和exchange

Rabbitmq使用必须理解的一些概念(转自:http://www.linuxidc.com/Linux/2013-11/92591.htm)channel:通道,amqp支持一个tcp连接上启用多个mq通信通道,每个通道都可以被作为通信流.producer:生产者,是消息产生的源头.exchange:交换机,可以理解为具有路由表的路由规则.queues:队列,装载消息的缓存容器.consumer:消费者,连接到队列并取走消息的客户端.核心思想:在RabbitMQ中,生产者从不直接将消息发送给队