RabbitMqHelper 消息队列帮助类

using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQ_Send
{
class ConfigModel
{
}

public enum ExchangeTypeEnum
{
/// <summary>
/// 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
/// 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
/// </summary>
fanout = 1,

/// <summary>
/// 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
/// 。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,
/// 则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
/// </summary>
direct = 2,

/// <summary>
/// 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
/// 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
/// 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
/// </summary>
topic = 3,

header = 4
}

/// <summary>
/// 数据被执行后的处理方式
/// </summary>
public enum ProcessingResultsEnum
{
/// <summary>
/// 处理成功
/// </summary>
Accept,

/// <summary>
/// 可以重试的错误
/// </summary>
Retry,

/// <summary>
/// 无需重试的错误
/// </summary>
Reject,
}

/// <summary>
/// 消息队列的配置信息
/// </summary>
public class RabbitMqConfigModel
{
#region host
/// <summary>
/// 服务器IP地址
/// </summary>
public string IP { get; set; }

/// <summary>
/// 服务器端口,默认是 5672
/// </summary>
public int Port { get; set; }

/// <summary>
/// 登录用户名
/// </summary>
public string UserName { get; set; }

/// <summary>
/// 登录密码
/// </summary>
public string Password { get; set; }
/// <summary>
/// 虚拟主机名称
/// </summary>
public string VirtualHost { get; set; }
#endregion

#region Queue
/// <summary>
/// 队列名称
/// </summary>
public string QueueName { get; set; }

/// <summary>
/// 是否持久化该队列
/// </summary>
public bool DurableQueue { get; set; }
#endregion

#region exchange
/// <summary>
/// 路由名称
/// </summary>
public string ExchangeName { get; set; }

/// <summary>
/// 路由的类型枚举
/// </summary>
public ExchangeTypeEnum ExchangeType { get; set; }

/// <summary>
/// 路由的关键字
/// </summary>
public string RoutingKey { get; set; }

#endregion

#region message
/// <summary>
/// 是否持久化队列中的消息
/// </summary>
public bool DurableMessage { get; set; }
#endregion
}
/// <summary>
/// 基类
/// </summary>
public class BaseService
{

public static IConnection _connection;

/// <summary>
/// 服务器配置
/// </summary>
public RabbitMqConfigModel RabbitConfig { get; set; }

#region 构造函数
/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public BaseService(RabbitMqConfigModel config)
{
try
{
RabbitConfig = config;
CreateConn();
}
catch (Exception)
{
throw;
}
}
#endregion

#region 方法
#region 初始化
/// <summary>
/// 创建连接
/// </summary>
public void CreateConn()
{
ConnectionFactory cf = new ConnectionFactory();
cf.Port = RabbitConfig.Port; //服务器的端口
cf.Endpoint = new AmqpTcpEndpoint(new Uri("amqp://" + RabbitConfig.IP + "/")); //服务器ip
cf.UserName = RabbitConfig.UserName; //登录账户
cf.Password = RabbitConfig.Password; //登录账户
cf.VirtualHost = RabbitConfig.VirtualHost; //虚拟主机
cf.RequestedHeartbeat = 60; //虚拟主机

_connection = cf.CreateConnection();
}
#endregion

#region 发送消息
/// <summary>
/// 发送消息,泛型
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <returns></returns>
public bool Send<T>(T messageInfo, ref string errMsg)
{
if (messageInfo == null)
{
errMsg = "消息对象不能为空";
return false;
}
string value = JsonConvert.SerializeObject(messageInfo);
return Send(value, ref errMsg);
}
/// <summary>
/// 发送消息,string类型
/// </summary>
/// <param name="message"></param>
/// <param name="errMsg"></param>
/// <returns></returns>
public bool Send(string message, ref string errMsg)
{
if (string.IsNullOrEmpty(message))
{
errMsg = "消息不能为空";
return false;
}
try
{
if (!_connection.IsOpen)
{
CreateConn();
}
using (var channel = _connection.CreateModel())
{
//推送消息
byte[] bytes = Encoding.UTF8.GetBytes(message);

IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = Convert.ToByte(RabbitConfig.DurableMessage ? 2 : 1); //支持可持久化数据

if (string.IsNullOrEmpty(RabbitConfig.ExchangeName))
{
//使用自定义的路由
channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableMessage, false, null);
channel.BasicPublish("", RabbitConfig.QueueName, properties, bytes);
}
else
{
//申明消息队列,且为可持久化的,如果队列的名称不存在,系统会自动创建,有的话不会覆盖
channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
channel.BasicPublish(RabbitConfig.ExchangeName, RabbitConfig.RoutingKey, properties, bytes);
}
return true;
}

}
catch (Exception ex)
{
errMsg = ex.Message;
return false;
}
}
#endregion
}
public class RabbitBasicService : BaseService
{

/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public RabbitBasicService(RabbitMqConfigModel config)
: base(config)
{ }

/// <summary>
/// 接受消息,使用Action进行处理
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="method"></param>
public void Receive<T>(Func<string, bool> method)
{
try
{
using (var channel = _connection.CreateModel())
{
//申明队列
channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
//使用路由
if (!string.IsNullOrEmpty(RabbitConfig.ExchangeName))
{
//申明路由
channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableQueue);
//队列和交换机绑定
channel.QueueBind(RabbitConfig.QueueName, RabbitConfig.ExchangeName, RabbitConfig.RoutingKey);
}

//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);
//在队列上定义一个消费者
var customer = new QueueingBasicConsumer(channel);
//var customer = new EventingBasicConsumer (channel);

//消费队列,并设置应答模式为程序主动应答
channel.BasicConsume(RabbitConfig.QueueName, false, customer);

while (true)//timer
{
//阻塞函数,获取队列中的消息
ProcessingResultsEnum processingResult = ProcessingResultsEnum.Retry;
ulong deliveryTag = 0;
try
{
//Thread.Sleep(10);

var ea = customer.Queue.Dequeue();
deliveryTag = ea.DeliveryTag;
byte[] bytes = ea.Body;
string body = Encoding.UTF8.GetString(bytes);
// T info = JsonConvert.DeserializeObject<T>(body);
method(body);
processingResult = ProcessingResultsEnum.Accept;
}
catch (Exception ex)
{
processingResult = ProcessingResultsEnum.Reject; //系统无法处理的错误
}
finally
{
switch (processingResult)
{
case ProcessingResultsEnum.Accept:
//回复确认处理成功
channel.BasicAck(deliveryTag,
false);//处理单挑信息
break;
case ProcessingResultsEnum.Retry:
//发生错误了,但是还可以重新提交给队列重新分配
channel.BasicNack(deliveryTag, false, true);
break;
case ProcessingResultsEnum.Reject:
//发生严重错误,无法继续进行,这种情况应该写日志或者是发送消息通知管理员
channel.BasicNack(deliveryTag, false, false);
//写日志
break;
}
}
}

}
}
catch (Exception ex)
{
}
}

}
#endregion
}

原文地址:https://www.cnblogs.com/request/p/11237088.html

时间: 2024-10-13 10:27:29

RabbitMqHelper 消息队列帮助类的相关文章

消息队列帮助类

准备工作: 1:安装windows组件(MSMQ) 编写代码: using System; using System.Messaging; using System.Collections.Generic; using System.Text; namespace LCL.Bus { public interface IBusMessageQueue { void Clear(); List<BusMessage> GetAll(); BusMessage Receive(); BusMess

第四讲:消息队列处理类

#ifndef __CommonMsgHandler___ #define __CommonMsgHandler___ #include "cocos2d.h" #include <thread> #include <queue> #include <iostream> #include <google/protobuf/message.h> #include <sigslot.h> #include "receiv

MSMQ消息队列

一.引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版本管理的序列化功能,并提供了与消息队列(MSMQ).COM+.Asp.net Web服务..NET Remoting等微软现有的分布式系统技术.利用WCF平台,开发人员可以很方便地构建面向服务的应用程序(SOA).可以认为,WCF是对之前现有的分布式技术(指的是MSMQ..NET Remoting和Web 服务等技术

跟我一起学WCF(1)——MSMQ消息队列

一.引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版本管理的序列化功能,并提供了与消息队列(MSMQ).COM+.Asp.net Web服务..NET Remoting等微软现有的分布式系统技术.利用WCF平台,开发人员可以很方便地构建面向服务的应用程序(SOA).可以认为,WCF是对之前现有的分布式技术(指的是MSMQ..NET Remoting和Web 服务等技术

.Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信

RabbitMQ是什么,怎么使用我就不介绍了,大家可以到园子里搜一下教程.本篇的重点在于实现服务与服务之间的异步通信. 首先说一下为什么要使用消息队列来实现服务通信:1.提高接口并发能力.  2.保证服务各方数据最终一致.  3.解耦. 使用消息队列通信的有点就是直接调用的缺点,比如在直接调用过程中发生未知错误,很可能就会出现数据不一致的问题,这个时候就需要人工修补数据,如果有过这个经历的同学一定是可怜的,人工修补数据简直痛苦!!再比如高并发情况下接口直接挂点,这就更直白了,接口挂了,功能就挂了

RabbitMQ消息队列随笔

本文权当各位看官对RabbitMQ的基本概念以及使用场景有了一定的了解,如果你还对它所知甚少或者只是停留在仅仅是听说过,建议你先看看这篇文章,在对RabbitMQ有了基本认识后,我们正式开启我们的RabbitMQ之旅吧,希望本文能够帮助大家在实际用到消息队列时有所帮助,如有表述的不当之处,还望各位看官指正. 一.消息队列的安装 1. RabbitMQ是用Erlang编程语言进行开发,所以首先得在Erlang官网下载Erlang运行的环境,如图,选择所需对应文件进行下载,并进行安装: 2. 设置环

【转】MSMQ 微软消息队列 简单 示例

MSMQ它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中:本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理. 我个人的理解,你可以把他当做一种,把数据打包后,发送到一个地方,程序也可以去取到这个打包的程序,队列的机制就不讲了,并发问题荡然无存.呵呵. 上代码: 首先 using System.Messaging; public class MsmqManagerHe

XSI进程间通信-----消息队列

1. 基本特点 1) 消息队列是一个由系统内核负责存储和管理,并通过消息队列标识引用的数据链表,消息队列 和有名管道fifo的区别在: 后者一次只能放一个包,而前者则可以放很多包,这样就能处理发包快,哪包慢的问题 2) 可以通过msgget函数创建一个新的消息队列, 或获取一个已有的消息队列. 通过msgsnd函数 (send)向消息队列的后端追加消息, 通过msgrcv(receive)函数从消息队列的前端提取消息. 3) 消息队列中的每个消息单元除包含消息数据外,还包含消息类型和数据长度.消

消息队列实现订单异步提交

what MSMQ(Microsoft Message Queue),微软消息队列,用于应用程序之间相互通信的一种异步传输模式.应用程序可以分布在同台机器上,也可以分布于互联的网络中的任意位置.基本原理:消息发送者把要发送的消息放入容器,也就是Message(消息),然后保存到系统公用空间的消息队列中(Message Queue)中,本地或互联位置上的消息接收程序再从队列中取出发给它的消息进行处理.消息类型可以是文本,图像,自定义对象等.消息队列分为公共队列和私有队列. why 一.用于进程间的